Merge pull request #14741 from y1chi/beam-12294

[BEAM-12294] Implement close function for BeamFnStatusClient to shutd…
diff --git a/.test-infra/jenkins/CommonJobProperties.groovy b/.test-infra/jenkins/CommonJobProperties.groovy
index 40f143a..295cc89 100644
--- a/.test-infra/jenkins/CommonJobProperties.groovy
+++ b/.test-infra/jenkins/CommonJobProperties.groovy
@@ -49,7 +49,7 @@
 
     // Discard old builds. Build records are only kept up to this number of days.
     context.logRotator {
-      daysToKeep(14)
+      daysToKeep(30)
     }
 
     // Source code management.
@@ -183,6 +183,11 @@
     context.switches("-Dorg.gradle.jvmargs=-Xms2g")
     context.switches("-Dorg.gradle.jvmargs=-Xmx4g")
 
+    // Disable file system watching for CI builds
+    // Builds are performed on a clean clone and files aren't modified, so
+    // there's no value in watching for changes.
+    context.switches("-Dorg.gradle.vfs.watch=false")
+
     // Include dependency licenses when build docker images on Jenkins, see https://s.apache.org/zt68q
     context.switches("-Pdocker-pull-licenses")
   }
diff --git a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow.groovy
index 0e3e628..1ac4562 100644
--- a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow.groovy
@@ -27,7 +27,7 @@
 
       description('Runs the ValidatesRunner suite on the Dataflow runner.')
 
-      commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 270)
+      commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 420)
       previousNames(/beam_PostCommit_Java_ValidatesRunner_Dataflow_Gradle/)
 
       // Publish all test results to Jenkins
diff --git a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow_Java11.groovy b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow_Java11.groovy
index af9e25e..6ba9685 100644
--- a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow_Java11.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow_Java11.groovy
@@ -28,7 +28,7 @@
       def JAVA_11_HOME = '/usr/lib/jvm/java-11-openjdk-amd64'
       def JAVA_8_HOME = '/usr/lib/jvm/java-8-openjdk-amd64'
 
-      commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 270)
+      commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 420)
       publishers {
         archiveJunit('**/build/test-results/**/*.xml')
       }
diff --git a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.groovy b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.groovy
index 0de085d..3b3d8ed 100644
--- a/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Java_ValidatesRunner_Dataflow_V2_Streaming.groovy
@@ -27,7 +27,7 @@
 
       description('Runs Java ValidatesRunner suite on the Dataflow runner V2 forcing streaming mode.')
 
-      commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 330)
+      commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 450)
 
       // Publish all test results to Jenkins
       publishers {
diff --git a/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow.groovy b/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow.groovy
index e847928..5631649 100644
--- a/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow.groovy
@@ -26,7 +26,7 @@
       description('Runs Python ValidatesRunner suite on the Dataflow runner.')
 
       // Set common parameters.
-      commonJobProperties.setTopLevelMainJobProperties(delegate)
+      commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 200)
 
       publishers {
         archiveJunit('**/nosetests*.xml')
diff --git a/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow_V2.groovy b/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow_V2.groovy
index c219de6..e164514 100644
--- a/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow_V2.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python_ValidatesRunner_Dataflow_V2.groovy
@@ -26,7 +26,7 @@
       description('Runs Python ValidatesRunner suite on the Dataflow runner v2.')
 
       // Set common parameters.
-      commonJobProperties.setTopLevelMainJobProperties(delegate)
+      commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 200)
 
       publishers {
         archiveJunit('**/nosetests*.xml')
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/GrpcVendoring_1_26_0.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/GrpcVendoring_1_26_0.groovy
index 40e7383..5cba1d4 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/GrpcVendoring_1_26_0.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/GrpcVendoring_1_26_0.groovy
@@ -42,7 +42,7 @@
   static def alpn_api_version = "1.1.2.v20150522"
   static def npn_api_version = "1.1.1.v20141010"
   static def jboss_marshalling_version = "1.4.11.Final"
-  static def jboss_modules_version = "1.11.0.Final"
+  static def jboss_modules_version = "1.1.0.Beta1"
 
   /** Returns the list of compile time dependencies. */
   static List<String> dependencies() {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
index aae35ab..e32edf7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
@@ -26,7 +26,6 @@
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.PValues;
 import org.apache.beam.sdk.values.TaggedPValue;
 import org.apache.beam.sdk.values.TupleTag;
@@ -41,17 +40,16 @@
   private ReplacementOutputs() {}
 
   public static Map<PCollection<?>, ReplacementOutput> singleton(
-      Map<TupleTag<?>, PCollection<?>> original, PValue replacement) {
+      Map<TupleTag<?>, PCollection<?>> original, POutput replacement) {
     Entry<TupleTag<?>, PCollection<?>> originalElement =
         Iterables.getOnlyElement(original.entrySet());
-    TupleTag<?> replacementTag = Iterables.getOnlyElement(replacement.expand().entrySet()).getKey();
-    PCollection<?> replacementCollection =
-        (PCollection<?>) Iterables.getOnlyElement(replacement.expand().entrySet()).getValue();
+    Entry<TupleTag<?>, PCollection<?>> replacementElement =
+        Iterables.getOnlyElement(PValues.expandOutput(replacement).entrySet());
     return Collections.singletonMap(
-        replacementCollection,
+        replacementElement.getValue(),
         ReplacementOutput.of(
             TaggedPValue.of(originalElement.getKey(), originalElement.getValue()),
-            TaggedPValue.of(replacementTag, replacementCollection)));
+            TaggedPValue.of(replacementElement.getKey(), replacementElement.getValue())));
   }
 
   public static Map<PCollection<?>, ReplacementOutput> tagged(
diff --git a/runners/flink/1.12/build.gradle b/runners/flink/1.12/build.gradle
index 6db1ccb..93ba0c3 100644
--- a/runners/flink/1.12/build.gradle
+++ b/runners/flink/1.12/build.gradle
@@ -20,7 +20,7 @@
 /* All properties required for loading the Flink build script */
 project.ext {
   // Set the version of all Flink-related dependencies here.
-  flink_version = '1.12.2'
+  flink_version = '1.12.3'
   // Version specific code overrides.
   main_source_overrides = ["${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", './src/main/java']
   test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", './src/test/java']
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index b814c70..43f507d 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -45,7 +45,7 @@
   filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [
     'dataflow.legacy_environment_major_version' : '8',
     'dataflow.fnapi_environment_major_version' : '8',
-    'dataflow.container_version' : 'beam-master-20210429',
+    'dataflow.container_version' : 'beam-master-20210505',
     'dataflow.container_base_repository' : 'gcr.io/cloud-dataflow/v1beta3',
   ]
 }
diff --git a/runners/portability/java/build.gradle b/runners/portability/java/build.gradle
index bce5404..4ec158d 100644
--- a/runners/portability/java/build.gradle
+++ b/runners/portability/java/build.gradle
@@ -192,6 +192,13 @@
       // https://issues.apache.org/jira/browse/BEAM-10452
       excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode'
 
+      // https://issues.apache.org/jira/browse/BEAM-12275
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$MultipleInputsAndOutputTests.testSideInputAnnotationWithMultipleSideInputs'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testMapAsEntrySetSideInput'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedMultimapAsEntrySetSideInput'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedMapAsEntrySetSideInput'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testMultimapAsEntrySetSideInput'
+
       // https://issues.apache.org/jira/browse/BEAM-10995
       excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testWindowPreservation'
     }
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
index a8c1771..9bc32fc 100644
--- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
@@ -22,11 +22,14 @@
 import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.twister2.Twister2BatchTranslationContext;
 import org.apache.beam.runners.twister2.translators.BatchTransformTranslator;
+import org.apache.beam.runners.twister2.translators.functions.ByteToElemFunction;
 import org.apache.beam.runners.twister2.translators.functions.ByteToWindowFunctionPrimitive;
+import org.apache.beam.runners.twister2.translators.functions.ElemToBytesFunction;
 import org.apache.beam.runners.twister2.translators.functions.MapToTupleFunction;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -58,23 +61,50 @@
                 context.getCurrentTransform();
     org.apache.beam.sdk.values.PCollectionView<ViewT> input;
     PCollection<ElemT> inputPCol = context.getInput(transform);
-    final KvCoder coder = (KvCoder) inputPCol.getCoder();
-    Coder inputKeyCoder = coder.getKeyCoder();
+    final Coder coder = inputPCol.getCoder();
     WindowingStrategy windowingStrategy = inputPCol.getWindowingStrategy();
     WindowFn windowFn = windowingStrategy.getWindowFn();
-    final WindowedValue.WindowedValueCoder wvCoder =
-        WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());
-    BatchTSet<WindowedValue<ElemT>> inputGathered =
-        inputDataSet
-            .direct()
-            .map(new MapToTupleFunction<>(inputKeyCoder, wvCoder))
-            .allGather()
-            .map(new ByteToWindowFunctionPrimitive(inputKeyCoder, wvCoder));
     try {
       input = CreatePCollectionViewTranslation.getView(application);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    context.setSideInputDataSet(input.getTagInternal().getId(), inputGathered);
+
+    switch (input.getViewFn().getMaterialization().getUrn()) {
+      case Materializations.MULTIMAP_MATERIALIZATION_URN:
+        KvCoder kvCoder = (KvCoder<?, ?>) coder;
+        final Coder keyCoder = kvCoder.getKeyCoder();
+        final WindowedValue.WindowedValueCoder kvwvCoder =
+            WindowedValue.FullWindowedValueCoder.of(
+                kvCoder.getValueCoder(), windowFn.windowCoder());
+        BatchTSet<WindowedValue<ElemT>> multimapMaterialization =
+            inputDataSet
+                .direct()
+                .map(new MapToTupleFunction<>(keyCoder, kvwvCoder))
+                .allGather()
+                .map(new ByteToWindowFunctionPrimitive(keyCoder, kvwvCoder));
+        context.setSideInputDataSet(input.getTagInternal().getId(), multimapMaterialization);
+        break;
+      case Materializations.ITERABLE_MATERIALIZATION_URN:
+        final WindowedValue.WindowedValueCoder wvCoder =
+            WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder());
+        BatchTSet<WindowedValue<ElemT>> iterableMaterialization =
+            inputDataSet
+                .direct()
+                .map(new ElemToBytesFunction<>(wvCoder))
+                .allGather()
+                .map(new ByteToElemFunction(wvCoder));
+        try {
+          input = CreatePCollectionViewTranslation.getView(application);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        context.setSideInputDataSet(input.getTagInternal().getId(), iterableMaterialization);
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Unknown side input materialization "
+                + input.getViewFn().getMaterialization().getUrn());
+    }
   }
 }
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
new file mode 100644
index 0000000..578225f
--- /dev/null
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.MapFunc;
+import java.io.ObjectStreamException;
+import java.util.logging.Logger;
+import org.apache.beam.runners.twister2.utils.TranslationUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+
+/** ByteToWindow function. */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ByteToElemFunction<V> implements MapFunc<WindowedValue<V>, byte[]> {
+  private transient WindowedValueCoder<V> wvCoder;
+  private static final Logger LOG = Logger.getLogger(ByteToElemFunction.class.getName());
+
+  private transient boolean isInitialized = false;
+  private byte[] wvCoderBytes;
+
+  public ByteToElemFunction() {
+    // non arg constructor needed for kryo
+    isInitialized = false;
+  }
+
+  public ByteToElemFunction(final WindowedValueCoder<V> wvCoder) {
+    this.wvCoder = wvCoder;
+
+    wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder);
+  }
+
+  @Override
+  public WindowedValue<V> map(byte[] input) {
+    return TranslationUtils.fromByteArray(input, wvCoder);
+  }
+
+  @Override
+  public void prepare(TSetContext context) {
+    initTransient();
+  }
+
+  /**
+   * Method used to initialize the transient variables that were sent over as byte arrays or proto
+   * buffers.
+   */
+  private void initTransient() {
+    if (isInitialized) {
+      return;
+    }
+
+    wvCoder =
+        (WindowedValueCoder<V>)
+            SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Custom Coder Bytes");
+    this.isInitialized = true;
+  }
+
+  protected Object readResolve() throws ObjectStreamException {
+    return this;
+  }
+}
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
new file mode 100644
index 0000000..c83acdd
--- /dev/null
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.MapFunc;
+import java.io.ObjectStreamException;
+import java.util.logging.Logger;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Map to tuple function. */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ElemToBytesFunction<V> implements MapFunc<byte[], WindowedValue<V>> {
+
+  private transient WindowedValue.WindowedValueCoder<V> wvCoder;
+  private static final Logger LOG = Logger.getLogger(ElemToBytesFunction.class.getName());
+
+  private transient boolean isInitialized = false;
+  private byte[] wvCoderBytes;
+
+  public ElemToBytesFunction() {
+    // non arg constructor needed for kryo
+    this.isInitialized = false;
+  }
+
+  public ElemToBytesFunction(WindowedValue.WindowedValueCoder<V> wvCoder) {
+    this.wvCoder = wvCoder;
+    wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder);
+  }
+
+  @Override
+  public @Nullable byte[] map(WindowedValue<V> input) {
+    try {
+      return CoderUtils.encodeToByteArray(wvCoder, input);
+    } catch (CoderException e) {
+      LOG.info(e.getMessage());
+    }
+    return null;
+  }
+
+  @Override
+  public void prepare(TSetContext context) {
+    initTransient();
+  }
+
+  /**
+   * Method used to initialize the transient variables that were sent over as byte arrays or proto
+   * buffers.
+   */
+  private void initTransient() {
+    if (isInitialized) {
+      return;
+    }
+    wvCoder =
+        (WindowedValue.WindowedValueCoder<V>)
+            SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Coder");
+    this.isInitialized = true;
+  }
+
+  protected Object readResolve() throws ObjectStreamException {
+    return this;
+  }
+}
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
index bbcd392..6ed77c7 100644
--- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
@@ -23,6 +23,7 @@
 import edu.iu.dsc.tws.api.dataset.DataPartitionConsumer;
 import edu.iu.dsc.tws.api.tset.TSetContext;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,11 +32,11 @@
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.Materializations.MultimapView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -75,40 +76,79 @@
   }
 
   private <T> T getSideInput(PCollectionView<T> view, BoundedWindow window) {
-    Map<BoundedWindow, List<WindowedValue<KV<?, ?>>>> partitionedElements = new HashMap<>();
+    switch (view.getViewFn().getMaterialization().getUrn()) {
+      case Materializations.MULTIMAP_MATERIALIZATION_URN:
+        return getMultimapSideInput(view, window);
+      case Materializations.ITERABLE_MATERIALIZATION_URN:
+        return getIterableSideInput(view, window);
+      default:
+        throw new IllegalArgumentException(
+            "Unknown materialization type: " + view.getViewFn().getMaterialization().getUrn());
+    }
+  }
+
+  private <T> T getMultimapSideInput(PCollectionView<T> view, BoundedWindow window) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = getPartitionedElements(view);
+    Map<BoundedWindow, T> resultMap = new HashMap<>();
+
+    ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+    for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements :
+        partitionedElements.entrySet()) {
+
+      Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+      resultMap.put(
+          elements.getKey(),
+          viewFn.apply(
+              InMemoryMultimapSideInputView.fromIterable(
+                  keyCoder,
+                  (Iterable)
+                      elements.getValue().stream()
+                          .map(WindowedValue::getValue)
+                          .collect(Collectors.toList()))));
+    }
+    T result = resultMap.get(window);
+    if (result == null) {
+      result = viewFn.apply(InMemoryMultimapSideInputView.empty());
+    }
+    return result;
+  }
+
+  private Map<BoundedWindow, List<WindowedValue<?>>> getPartitionedElements(
+      PCollectionView<?> view) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = new HashMap<>();
     DataPartition<?> sideInput = runtimeContext.getInput(view.getTagInternal().getId());
     DataPartitionConsumer<?> dataPartitionConsumer = sideInput.getConsumer();
     while (dataPartitionConsumer.hasNext()) {
-      WindowedValue<KV<?, ?>> winValue = (WindowedValue<KV<?, ?>>) dataPartitionConsumer.next();
+      WindowedValue<?> winValue = (WindowedValue<?>) dataPartitionConsumer.next();
       for (BoundedWindow tbw : winValue.getWindows()) {
-        List<WindowedValue<KV<?, ?>>> windowedValues =
+        List<WindowedValue<?>> windowedValues =
             partitionedElements.computeIfAbsent(tbw, k -> new ArrayList<>());
         windowedValues.add(winValue);
       }
     }
+    return partitionedElements;
+  }
 
+  private <T> T getIterableSideInput(PCollectionView<T> view, BoundedWindow window) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = getPartitionedElements(view);
+
+    ViewFn<Materializations.IterableView, T> viewFn =
+        (ViewFn<Materializations.IterableView, T>) view.getViewFn();
     Map<BoundedWindow, T> resultMap = new HashMap<>();
 
-    for (Map.Entry<BoundedWindow, List<WindowedValue<KV<?, ?>>>> elements :
+    for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements :
         partitionedElements.entrySet()) {
-
-      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
-      Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
       resultMap.put(
           elements.getKey(),
-          (T)
-              viewFn.apply(
-                  InMemoryMultimapSideInputView.fromIterable(
-                      keyCoder,
-                      (Iterable)
-                          elements.getValue().stream()
-                              .map(WindowedValue::getValue)
-                              .collect(Collectors.toList()))));
+          viewFn.apply(
+              () ->
+                  elements.getValue().stream()
+                      .map(WindowedValue::getValue)
+                      .collect(Collectors.toList())));
     }
     T result = resultMap.get(window);
     if (result == null) {
-      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
-      result = viewFn.apply(InMemoryMultimapSideInputView.empty());
+      result = viewFn.apply(() -> Collections.<T>emptyList());
     }
     return result;
   }
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go
index a011a78..0bd33bd 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/logicaltypes.go
@@ -51,7 +51,6 @@
 
 // Registry retains mappings from go types to Schemas and LogicalTypes.
 type Registry struct {
-	lastShortID     int64
 	typeToSchema    map[reflect.Type]*pipepb.Schema
 	idToType        map[string]reflect.Type
 	syntheticToUser map[reflect.Type]reflect.Type
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index abb6f10..087d8c1 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -26,7 +26,9 @@
 package schema
 
 import (
+	"bytes"
 	"fmt"
+	"hash/fnv"
 	"reflect"
 	"strings"
 
@@ -72,8 +74,19 @@
 	defaultRegistry.RegisterType(ut)
 }
 
-func getUUID() string {
-	return uuid.New().String()
+// getUUID generates a UUID using the string form of the type name.
+func getUUID(ut reflect.Type) string {
+	// String produces non-empty output for pointer and slice types.
+	typename := ut.String()
+	hasher := fnv.New128a()
+	if n, err := hasher.Write([]byte(typename)); err != nil || n != len(typename) {
+		panic(fmt.Sprintf("unable to generate schema uuid for %s, wrote out %d bytes, want %d: err %v", typename, n, len(typename), err))
+	}
+	id, err := uuid.NewRandomFromReader(bytes.NewBuffer(hasher.Sum(nil)))
+	if err != nil {
+		panic(fmt.Sprintf("unable to genereate schema uuid for type %s: %v", typename, err))
+	}
+	return id.String()
 }
 
 // Registered returns whether the given type has been registered with
@@ -350,7 +363,7 @@
 		if lID != "" {
 			schm.Options = append(schm.Options, logicalOption(lID))
 		}
-		schm.Id = getUUID()
+		schm.Id = getUUID(ot)
 		r.typeToSchema[ot] = schm
 		r.idToType[schm.GetId()] = ot
 		return schm, nil
@@ -365,7 +378,7 @@
 	// Cache the pointer type here with it's own id.
 	pt := reflect.PtrTo(t)
 	schm = proto.Clone(schm).(*pipepb.Schema)
-	schm.Id = getUUID()
+	schm.Id = getUUID(pt)
 	schm.Options = append(schm.Options, &pipepb.Option{
 		Name: optGoNillable,
 	})
@@ -454,7 +467,7 @@
 		schm := ftype.GetRowType().GetSchema()
 		schm = proto.Clone(schm).(*pipepb.Schema)
 		schm.Options = append(schm.Options, logicalOption(lID))
-		schm.Id = getUUID()
+		schm.Id = getUUID(t)
 		r.typeToSchema[t] = schm
 		r.idToType[schm.GetId()] = t
 		return schm, nil
@@ -483,7 +496,7 @@
 
 	schm := &pipepb.Schema{
 		Fields: fields,
-		Id:     getUUID(),
+		Id:     getUUID(t),
 	}
 	r.idToType[schm.GetId()] = t
 	r.typeToSchema[t] = schm
diff --git a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
index 9ea5099..1ba1ac4 100644
--- a/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
+++ b/sdks/go/pkg/beam/core/runtime/xlangx/resolve.go
@@ -24,53 +24,105 @@
 	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
 	"github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	pipepb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
 )
 
 // ResolveArtifacts acquires all dependencies for a cross-language transform
 func ResolveArtifacts(ctx context.Context, edges []*graph.MultiEdge, p *pipepb.Pipeline) {
-	path, err := filepath.Abs("/tmp/artifacts")
+	_, err := ResolveArtifactsWithConfig(ctx, edges, ResolveConfig{})
 	if err != nil {
 		panic(err)
 	}
+}
+
+// ResolveConfig contains fields for configuring the behavior for resolving
+// artifacts.
+type ResolveConfig struct {
+	// SdkPath replaces the default filepath for dependencies, but only in the
+	// external environment proto to be used by the SDK Harness during pipeline
+	// execution. This is used to specify alternate staging directories, such
+	// as for staging artifacts remotely.
+	//
+	// Setting an SdkPath does not change staging behavior otherwise. All
+	// artifacts still get staged to the default local filepath, and it is the
+	// user's responsibility to stage those local artifacts to the SdkPath.
+	SdkPath string
+
+	// JoinFn is a function for combining SdkPath and individual artifact names.
+	// If not specified, it defaults to using filepath.Join.
+	JoinFn func(path, name string) string
+}
+
+func defaultJoinFn(path, name string) string {
+	return filepath.Join(path, "/", name)
+}
+
+// ResolveArtifactsWithConfig acquires all dependencies for cross-language
+// transforms, but with some additional configuration to behavior. By default,
+// this function performs the following steps for each cross-language transform
+// in the list of edges:
+//   1. Retrieves a list of dependencies needed from the expansion service.
+//   2. Retrieves each dependency as an artifact and stages it to a default
+//      local filepath.
+//   3. Adds the dependencies to the transform's stored environment proto.
+// The changes that can be configured are documented in ResolveConfig.
+//
+// This returns a map of "local path" to "sdk path". By default these are
+// identical, unless ResolveConfig.SdkPath has been set.
+func ResolveArtifactsWithConfig(ctx context.Context, edges []*graph.MultiEdge, cfg ResolveConfig) (paths map[string]string, err error) {
+	tmpPath, err := filepath.Abs("/tmp/artifacts")
+	if err != nil {
+		return nil, errors.WithContext(err, "resolving remote artifacts")
+	}
+	if cfg.JoinFn == nil {
+		cfg.JoinFn = defaultJoinFn
+	}
+	paths = make(map[string]string)
 	for _, e := range edges {
 		if e.Op == graph.External {
 			components, err := graphx.ExpandedComponents(e.External.Expanded)
 			if err != nil {
-				panic(err)
+				return nil, errors.WithContextf(err,
+					"resolving remote artifacts for edge %v", e.Name())
 			}
 			envs := components.Environments
 			for eid, env := range envs {
-
 				if strings.HasPrefix(eid, "go") {
 					continue
 				}
 				deps := env.GetDependencies()
-				resolvedArtifacts, err := artifact.Materialize(ctx, e.External.ExpansionAddr, deps, "", path)
+				resolvedArtifacts, err := artifact.Materialize(ctx, e.External.ExpansionAddr, deps, "", tmpPath)
 				if err != nil {
-					panic(err)
+					return nil, errors.WithContextf(err,
+						"resolving remote artifacts for env %v in edge %v", eid, e.Name())
 				}
 
 				var resolvedDeps []*pipepb.ArtifactInformation
 				for _, a := range resolvedArtifacts {
-					name, sha256 := artifact.MustExtractFilePayload(a)
-					fullPath := filepath.Join(path, "/", name)
+					name, _ := artifact.MustExtractFilePayload(a)
+					fullTmpPath := filepath.Join(tmpPath, "/", name)
+					fullSdkPath := fullTmpPath
+					if len(cfg.SdkPath) > 0 {
+						fullSdkPath = cfg.JoinFn(cfg.SdkPath, name)
+					}
 					resolvedDeps = append(resolvedDeps,
 						&pipepb.ArtifactInformation{
 							TypeUrn: "beam:artifact:type:file:v1",
 							TypePayload: protox.MustEncode(
 								&pipepb.ArtifactFilePayload{
-									Path:   fullPath,
-									Sha256: sha256,
+									Path: fullSdkPath,
 								},
 							),
 							RoleUrn:     a.RoleUrn,
 							RolePayload: a.RolePayload,
 						},
 					)
+					paths[fullTmpPath] = fullSdkPath
 				}
 				env.Dependencies = resolvedDeps
 			}
 		}
 	}
+	return paths, nil
 }
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflow.go b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
index c46ff4c..cd7be52 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflow.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflow.go
@@ -178,11 +178,23 @@
 	}
 
 	// (1) Build and submit
+	// NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar".
+	id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), time.Now().UnixNano())
+
+	modelURL := gcsx.Join(*stagingLocation, id, "model")
+	workerURL := gcsx.Join(*stagingLocation, id, "worker")
+	jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
+	xlangURL := gcsx.Join(*stagingLocation, id, "xlang")
 
 	edges, _, err := p.Build()
 	if err != nil {
 		return nil, err
 	}
+	artifactURLs, err := dataflowlib.ResolveXLangArtifacts(ctx, edges, opts.Project, xlangURL)
+	if err != nil {
+		return nil, errors.WithContext(err, "resolving cross-language artifacts")
+	}
+	opts.ArtifactURLs = artifactURLs
 	environment, err := graphx.CreateEnvironment(ctx, jobopts.GetEnvironmentUrn(ctx), getContainerImage)
 	if err != nil {
 		return nil, errors.WithContext(err, "creating environment for model pipeline")
@@ -196,13 +208,6 @@
 		return nil, errors.WithContext(err, "applying container image overrides")
 	}
 
-	// NOTE(herohde) 10/8/2018: the last segment of the names must be "worker" and "dataflow-worker.jar".
-	id := fmt.Sprintf("go-%v-%v", atomic.AddInt32(&unique, 1), time.Now().UnixNano())
-
-	modelURL := gcsx.Join(*stagingLocation, id, "model")
-	workerURL := gcsx.Join(*stagingLocation, id, "worker")
-	jarURL := gcsx.Join(*stagingLocation, id, "dataflow-worker.jar")
-
 	if *dryRun {
 		log.Info(ctx, "Dry-run: not submitting job!")
 
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
index 511b962..390082b 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
@@ -56,6 +56,7 @@
 	WorkerRegion        string
 	WorkerZone          string
 	ContainerImage      string
+	ArtifactURLs        []string // Additional packages for workers.
 
 	// Autoscaling settings
 	Algorithm     string
@@ -128,6 +129,15 @@
 		experiments = append(experiments, "use_staged_dataflow_worker_jar")
 	}
 
+	for _, url := range opts.ArtifactURLs {
+		name := url[strings.LastIndexAny(url, "/")+1:]
+		pkg := &df.Package{
+			Name:     name,
+			Location: url,
+		}
+		packages = append(packages, pkg)
+	}
+
 	ipConfiguration := "WORKER_IP_UNSPECIFIED"
 	if opts.NoUsePublicIPs {
 		ipConfiguration = "WORKER_IP_PRIVATE"
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
index 67a2bde..49ca5bf 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/stage.go
@@ -22,6 +22,8 @@
 	"os"
 
 	"cloud.google.com/go/storage"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+	"github.com/apache/beam/sdks/go/pkg/beam/core/runtime/xlangx"
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"github.com/apache/beam/sdks/go/pkg/beam/util/gcsx"
 )
@@ -54,3 +56,28 @@
 	_, err = gcsx.Upload(ctx, client, project, bucket, obj, r)
 	return err
 }
+
+// ResolveXLangArtifacts resolves cross-language artifacts with a given GCS
+// URL as a destination, and then stages all local artifacts to that URL. This
+// function returns a list of staged artifact URLs.
+func ResolveXLangArtifacts(ctx context.Context, edges []*graph.MultiEdge, project, url string) ([]string, error) {
+	cfg := xlangx.ResolveConfig{
+		SdkPath: url,
+		JoinFn: func(url, name string) string {
+			return gcsx.Join(url, "/", name)
+		},
+	}
+	paths, err := xlangx.ResolveArtifactsWithConfig(ctx, edges, cfg)
+	if err != nil {
+		return nil, err
+	}
+	var urls []string
+	for local, remote := range paths {
+		err := StageFile(ctx, project, remote, local)
+		if err != nil {
+			return nil, errors.WithContextf(err, "staging file to %v", remote)
+		}
+		urls = append(urls, remote)
+	}
+	return urls, nil
+}
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go b/sdks/go/pkg/beam/runners/universal/universal.go
index f01a71f..af14b38 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -79,6 +79,9 @@
 		getEnvCfg = srv.EnvironmentConfig
 	}
 
+	// Fetch all dependencies for cross-language transforms
+	xlangx.ResolveArtifacts(ctx, edges, nil)
+
 	environment, err := graphx.CreateEnvironment(ctx, envUrn, getEnvCfg)
 	if err != nil {
 		return nil, errors.WithContextf(err, "generating model pipeline")
@@ -88,9 +91,6 @@
 		return nil, errors.WithContextf(err, "generating model pipeline")
 	}
 
-	// Fetch all dependencies for cross-language transforms
-	xlangx.ResolveArtifacts(ctx, edges, pipeline)
-
 	log.Info(ctx, proto.MarshalTextString(pipeline))
 
 	opt := &runnerlib.JobOptions{
diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go
index c756e05..7db20ac 100644
--- a/sdks/go/test/integration/integration.go
+++ b/sdks/go/test/integration/integration.go
@@ -78,12 +78,6 @@
 var dataflowFilters = []string{
 	// TODO(BEAM-11576): TestFlattenDup failing on this runner.
 	"TestFlattenDup",
-	// TODO(BEAM-11418): These tests require implementing x-lang artifact
-	// staging on Dataflow.
-	"TestXLang_CoGroupBy",
-	"TestXLang_Multi",
-	"TestXLang_Partition",
-	"TestXLang_Prefix",
 }
 
 // CheckFilters checks if an integration test is filtered to be skipped, either
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 09464db..241fa1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -63,6 +63,7 @@
 import org.apache.beam.sdk.values.ValueWithRecordId.StripIdsDoFn;
 import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalListener;
@@ -827,24 +828,29 @@
                 source, checkpoint, BoundedWindow.TIMESTAMP_MIN_VALUE));
       }
 
+      private void initializeCurrentReader() throws IOException {
+        Preconditions.checkState(currentReader == null);
+        Object cacheKey =
+            createCacheKey(initialRestriction.getSource(), initialRestriction.getCheckpoint());
+        currentReader = cachedReaders.getIfPresent(cacheKey);
+        if (currentReader == null) {
+          currentReader =
+              initialRestriction
+                  .getSource()
+                  .createReader(pipelineOptions, initialRestriction.getCheckpoint());
+        } else {
+          // If the reader is from cache, then we know that the reader has been started.
+          // We also remove this cache entry to avoid eviction.
+          readerHasBeenStarted = true;
+          cachedReaders.invalidate(cacheKey);
+        }
+      }
+
       @Override
       public boolean tryClaim(UnboundedSourceValue<OutputT>[] position) {
         try {
           if (currentReader == null) {
-            Object cacheKey =
-                createCacheKey(initialRestriction.getSource(), initialRestriction.getCheckpoint());
-            currentReader = cachedReaders.getIfPresent(cacheKey);
-            if (currentReader == null) {
-              currentReader =
-                  initialRestriction
-                      .getSource()
-                      .createReader(pipelineOptions, initialRestriction.getCheckpoint());
-            } else {
-              // If the reader is from cache, then we know that the reader has been started.
-              // We also remove this cache entry to avoid eviction.
-              readerHasBeenStarted = true;
-              cachedReaders.invalidate(cacheKey);
-            }
+            initializeCurrentReader();
           }
           if (currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader) {
             return false;
@@ -872,6 +878,8 @@
               currentReader.close();
             } catch (IOException closeException) {
               e.addSuppressed(closeException);
+            } finally {
+              currentReader = null;
             }
           }
           throw new RuntimeException(e);
@@ -957,10 +965,7 @@
 
         if (currentReader == null) {
           try {
-            currentReader =
-                initialRestriction
-                    .getSource()
-                    .createReader(pipelineOptions, initialRestriction.getCheckpoint());
+            initializeCurrentReader();
           } catch (IOException e) {
             throw new RuntimeException(e);
           }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 92c3e05..b04266c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -411,12 +411,12 @@
       checkState(
           this.outputs == null, "Tried to specify more than one output for %s", getFullName());
       checkNotNull(output, "Tried to set the output of %s to null", getFullName());
-      this.outputs = PValues.fullyExpand(output.expand());
+      this.outputs = PValues.expandOutput(output);
 
       // Validate that a primitive transform produces only primitive output, and a composite
       // transform does not produce primitive output.
       Set<Node> outputProducers = new HashSet<>();
-      for (PCollection<?> outputValue : PValues.fullyExpand(output.expand()).values()) {
+      for (PCollection<?> outputValue : PValues.expandOutput(output).values()) {
         outputProducers.add(getProducer(outputValue));
       }
       if (outputProducers.contains(this) && (!parts.isEmpty() || outputProducers.size() > 1)) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java
index 0769782..aeecc53 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHints.java
@@ -85,10 +85,7 @@
   public static ResourceHints fromOptions(PipelineOptions options) {
     ResourceHintsOptions resourceHintsOptions = options.as(ResourceHintsOptions.class);
     ResourceHints result = create();
-    @Nullable List<String> hints = resourceHintsOptions.getResourceHints();
-    if (hints == null) {
-      return result;
-    }
+    List<String> hints = resourceHintsOptions.getResourceHints();
     Splitter splitter = Splitter.on('=').limit(2);
     for (String hint : hints) {
       List<String> parts = splitter.splitToList(hint);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHintsOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHintsOptions.java
index 2b18feb..f63ee2c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHintsOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/resourcehints/ResourceHintsOptions.java
@@ -19,19 +19,27 @@
 
 import com.google.auto.service.AutoService;
 import java.util.List;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
-import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Options that are used to control configuration of the remote environment. */
 public interface ResourceHintsOptions extends PipelineOptions {
+  class EmptyListDefault implements DefaultValueFactory<List> {
+    @Override
+    public List create(PipelineOptions options) {
+      return ImmutableList.of();
+    }
+  }
+
   @Description("Resource hints used for all transform execution environments.")
-  @Nullable
+  @Default.InstanceFactory(EmptyListDefault.class)
   List<String> getResourceHints();
 
-  void setResourceHints(@Nullable List<String> value);
+  void setResourceHints(List<String> value);
 
   /** Register the {@link ResourceHintsOptions}. */
   @AutoService(PipelineOptionsRegistrar.class)
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
index 588f91f..4cb8448 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java
@@ -689,20 +689,6 @@
                 .apply("Read Files", TextIO.readFiles()))
         .containsInAnyOrder(data);
 
-    PAssert.that(
-            p.apply("Create Data ReadAll", Create.of(data))
-                .apply(
-                    "Write ReadAll",
-                    FileIO.<String>write()
-                        .to(tempFolder.getRoot().toString())
-                        .withSuffix(".txt")
-                        .via(TextIO.sink())
-                        .withIgnoreWindowing())
-                .getPerDestinationOutputFilenames()
-                .apply("Extract Values ReadAll", Values.create())
-                .apply("Read All", TextIO.readAll()))
-        .containsInAnyOrder(data);
-
     p.run();
   }
 
diff --git a/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto b/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto
index 1486ee8..cecd9d3 100644
--- a/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto
+++ b/sdks/java/extensions/protobuf/src/test/proto/payload_messages.proto
@@ -42,3 +42,26 @@
   int32 id = 1;
   string name = 2;
 }
+
+message NameMessage {
+  string name = 1;
+
+  enum NameType {
+    FIRST = 0;
+    MIDDLE = 1;
+    LAST = 2;
+    SECOND_LAST = 3;
+  }
+  repeated NameType name_array = 2;
+}
+
+message NameHeightMessage {
+  string name = 1;
+  int32 height = 2;
+}
+
+message NameHeightKnowsJSMessage {
+  string name = 1;
+  int32 height = 2;
+  bool knows_javascript = 3;
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFn.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFn.java
index 3b782d9..c489d12 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFn.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFn.java
@@ -73,10 +73,9 @@
 
   @Override
   public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-    Iterator<AccumT> it = accumulators.iterator();
-    AccumT first = it.next();
-    it.remove();
-    return getAggregateFn().mergeAccumulators(first, accumulators);
+    AccumT first = accumulators.iterator().next();
+    Iterable<AccumT> rest = new SkipFirstElementIterable<>(accumulators);
+    return getAggregateFn().mergeAccumulators(first, rest);
   }
 
   @Override
@@ -99,4 +98,20 @@
   public TypeVariable<?> getAccumTVariable() {
     return AggregateFn.class.getTypeParameters()[1];
   }
+
+  /** Wrapper {@link Iterable} which always skips its first element. */
+  private static class SkipFirstElementIterable<T> implements Iterable<T> {
+    private final Iterable<T> all;
+
+    SkipFirstElementIterable(Iterable<T> all) {
+      this.all = all;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+      Iterator<T> it = all.iterator();
+      it.next();
+      return it;
+    }
+  }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index a77d04d..5889bc5 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -83,6 +83,7 @@
       int windowFieldIndex) {
 
     super(cluster, traits, child, groupSet, groupSets, aggCalls);
+    assert getGroupType() == Group.SIMPLE;
 
     this.windowFn = windowFn;
     this.windowFieldIndex = windowFieldIndex;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java
index ae0183c..83e18b6 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java
@@ -60,6 +60,11 @@
   public void onMatch(RelOptRuleCall call) {
     final Aggregate aggregate = call.rel(0);
     final Project project = call.rel(1);
+
+    if (aggregate.getGroupType() != Aggregate.Group.SIMPLE) {
+      return;
+    }
+
     RelNode x = updateWindow(call, aggregate, project);
     if (x == null) {
       // Non-windowed case should be handled by the BeamBasicAggregationRule
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java
index f12ddc4..cb5702b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamBasicAggregationRule.java
@@ -60,6 +60,10 @@
     Aggregate aggregate = call.rel(0);
     RelNode relNode = call.rel(1);
 
+    if (aggregate.getGroupType() != Aggregate.Group.SIMPLE) {
+      return;
+    }
+
     if (relNode instanceof Project || relNode instanceof Calc || relNode instanceof Filter) {
       if (isWindowed(relNode) || hasWindowedParents(relNode)) {
         // This case is expected to get handled by the 'BeamAggregationRule'
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
index 34664ac..7b580ef 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtils.java
@@ -27,6 +27,7 @@
 import org.apache.beam.sdk.schemas.Schema.TypeName;
 import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
+import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.BiMap;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableBiMap;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
@@ -39,9 +40,6 @@
 import org.joda.time.base.AbstractInstant;
 
 /** Utility methods for Calcite related operations. */
-@SuppressWarnings({
-  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
-})
 public class CalciteUtils {
   private static final long UNLIMITED_ARRAY_SIZE = -1L;
 
@@ -73,7 +71,9 @@
     }
 
     if (fieldType.getTypeName().isLogicalType()) {
-      String logicalId = fieldType.getLogicalType().getIdentifier();
+      Schema.LogicalType logicalType = fieldType.getLogicalType();
+      Preconditions.checkArgumentNotNull(logicalType);
+      String logicalId = logicalType.getIdentifier();
       return logicalId.equals(SqlTypes.DATE.getIdentifier())
           || logicalId.equals(SqlTypes.TIME.getIdentifier())
           || logicalId.equals(TimeWithLocalTzType.IDENTIFIER)
@@ -88,7 +88,9 @@
     }
 
     if (fieldType.getTypeName().isLogicalType()) {
-      String logicalId = fieldType.getLogicalType().getIdentifier();
+      Schema.LogicalType logicalType = fieldType.getLogicalType();
+      Preconditions.checkArgumentNotNull(logicalType);
+      String logicalId = logicalType.getIdentifier();
       return logicalId.equals(CharType.IDENTIFIER);
     }
     return false;
@@ -210,7 +212,12 @@
                     + "so it cannot be converted to a %s",
                 sqlTypeName, Schema.FieldType.class.getSimpleName()));
       default:
-        return CALCITE_TO_BEAM_TYPE_MAPPING.get(sqlTypeName);
+        FieldType fieldType = CALCITE_TO_BEAM_TYPE_MAPPING.get(sqlTypeName);
+        if (fieldType == null) {
+          throw new IllegalArgumentException(
+              "Cannot find a matching Beam FieldType for Calcite type: " + sqlTypeName);
+        }
+        return fieldType;
     }
   }
 
@@ -234,7 +241,12 @@
         return FieldType.row(toSchema(calciteType));
 
       default:
-        return toFieldType(calciteType.getSqlTypeName()).withNullable(calciteType.isNullable());
+        try {
+          return toFieldType(calciteType.getSqlTypeName()).withNullable(calciteType.isNullable());
+        } catch (IllegalArgumentException e) {
+          throw new IllegalArgumentException(
+              "Cannot find a matching Beam FieldType for Calcite type: " + calciteType, e);
+        }
     }
   }
 
@@ -254,16 +266,22 @@
     switch (fieldType.getTypeName()) {
       case ARRAY:
       case ITERABLE:
+        FieldType collectionElementType = fieldType.getCollectionElementType();
+        Preconditions.checkArgumentNotNull(collectionElementType);
         return dataTypeFactory.createArrayType(
-            toRelDataType(dataTypeFactory, fieldType.getCollectionElementType()),
-            UNLIMITED_ARRAY_SIZE);
+            toRelDataType(dataTypeFactory, collectionElementType), UNLIMITED_ARRAY_SIZE);
       case MAP:
-        RelDataType componentKeyType = toRelDataType(dataTypeFactory, fieldType.getMapKeyType());
-        RelDataType componentValueType =
-            toRelDataType(dataTypeFactory, fieldType.getMapValueType());
+        FieldType mapKeyType = fieldType.getMapKeyType();
+        FieldType mapValueType = fieldType.getMapValueType();
+        Preconditions.checkArgumentNotNull(mapKeyType);
+        Preconditions.checkArgumentNotNull(mapValueType);
+        RelDataType componentKeyType = toRelDataType(dataTypeFactory, mapKeyType);
+        RelDataType componentValueType = toRelDataType(dataTypeFactory, mapValueType);
         return dataTypeFactory.createMapType(componentKeyType, componentValueType);
       case ROW:
-        return toCalciteRowType(fieldType.getRowSchema(), dataTypeFactory);
+        Schema schema = fieldType.getRowSchema();
+        Preconditions.checkArgumentNotNull(schema);
+        return toCalciteRowType(schema, dataTypeFactory);
       default:
         return dataTypeFactory.createSqlType(toSqlTypeName(fieldType));
     }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
index df5cbb3..50b5aa3 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslAggregationTest.java
@@ -32,6 +32,7 @@
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.impl.ParseException;
+import org.apache.beam.sdk.extensions.sql.impl.SqlConversionException;
 import org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.testing.PAssert;
@@ -136,6 +137,26 @@
     pipeline.run().waitUntilFinish();
   }
 
+  /** GROUP-BY ROLLUP with bounded PCollection. */
+  @Test
+  public void testAggregationRollupWithBounded() throws Exception {
+    runAggregationRollup(boundedInput1);
+  }
+
+  /** GROUP-BY with single aggregation function with unbounded PCollection. */
+  @Test
+  public void testAggregationRollupWithUnbounded() throws Exception {
+    runAggregationRollup(unboundedInput1);
+  }
+
+  private void runAggregationRollup(PCollection<Row> input) throws Exception {
+    String sql = "SELECT f_int2 FROM PCOLLECTION GROUP BY ROLLUP(f_int2)";
+
+    exceptions.expect(SqlConversionException.class);
+    pipeline.enableAbandonedNodeEnforcement(false);
+    input.apply(SqlTransform.query(sql));
+  }
+
   /** GROUP-BY with multiple aggregation functions with bounded PCollection. */
   @Test
   public void testAggregationFunctionsWithBounded() throws Exception {
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFnTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFnTest.java
index 21ab8d0..cf3f40d 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFnTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/LazyAggregateCombineFnTest.java
@@ -19,12 +19,14 @@
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
 
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.extensions.sql.udf.AggregateFn;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -41,6 +43,13 @@
     assertThat(coder, instanceOf(VarLongCoder.class));
   }
 
+  @Test
+  public void mergeAccumulators() {
+    LazyAggregateCombineFn<Long, Long, Long> combiner = new LazyAggregateCombineFn<>(new Sum());
+    long merged = combiner.mergeAccumulators(ImmutableList.of(1L, 1L));
+    assertEquals(2L, merged);
+  }
+
   public static class Sum implements AggregateFn<Long, Long, Long> {
 
     @Override
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
index 50b6ab2..e76ee7f 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/utils/CalciteUtilsTest.java
@@ -30,13 +30,17 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFactoryImpl;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeName;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 /** Tests for conversion from Beam schema to Calcite data type. */
 public class CalciteUtilsTest {
 
   RelDataTypeFactory dataTypeFactory;
 
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
   @Before
   public void setUp() {
     dataTypeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
@@ -166,4 +170,12 @@
 
     assertEquals(schema, out);
   }
+
+  @Test
+  public void testFieldTypeNotFound() {
+    RelDataType relDataType = dataTypeFactory.createUnknownType();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Cannot find a matching Beam FieldType for Calcite type: UNKNOWN");
+    CalciteUtils.toFieldType(relDataType);
+  }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
index a90f8e8..82c4b74 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.java
@@ -49,6 +49,7 @@
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
@@ -105,7 +106,11 @@
   @Parameters
   public static Collection<Object[]> data() {
     return Arrays.asList(
-        new Object[][] {{new PubsubJsonObjectProvider()}, {new PubsubAvroObjectProvider()}});
+        new Object[][] {
+          {new PubsubJsonObjectProvider()},
+          {new PubsubAvroObjectProvider()},
+          {new PubsubProtoObjectProvider()}
+        });
   }
 
   @Parameter public PubsubObjectProvider objectsProvider;
@@ -120,6 +125,7 @@
 
   @Test
   public void testSQLSelectsPayloadContent() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -134,18 +140,15 @@
                 + "LOCATION '%s' \n"
                 + "TBLPROPERTIES '{ "
                 + "%s"
+                + "\"protoClass\" : \"%s\", "
                 + "\"timestampAttributeKey\" : \"ts\" }'",
-            tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());
+            tableProvider.getTableType(),
+            eventsTopic.topicPath(),
+            payloadFormatParam(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     String queryString = "SELECT message.payload.id, message.payload.name from message";
 
-    // Prepare messages to send later
-    List<PubsubMessage> messages =
-        ImmutableList.of(
-            objectsProvider.messageIdName(ts(1), 3, "foo"),
-            objectsProvider.messageIdName(ts(2), 5, "bar"),
-            objectsProvider.messageIdName(ts(3), 7, "baz"));
-
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
     sqlEnv.executeDdl(createTableString);
@@ -173,7 +176,11 @@
         pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
 
     // Start publishing the messages when main pipeline is started and signaling topic is ready
-    eventsTopic.publish(messages);
+    eventsTopic.publish(
+        ImmutableList.of(
+            objectsProvider.messageIdName(ts(1), 3, "foo"),
+            objectsProvider.messageIdName(ts(2), 5, "bar"),
+            objectsProvider.messageIdName(ts(3), 7, "baz")));
 
     // Poll the signaling topic for success message
     resultSignal.waitForSuccess(Duration.standardMinutes(5));
@@ -181,6 +188,7 @@
 
   @Test
   public void testSQLSelectsArrayAttributes() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -195,19 +203,16 @@
                 + "LOCATION '%s' \n"
                 + "TBLPROPERTIES '{ "
                 + "%s"
+                + "\"protoClass\" : \"%s\", "
                 + "\"timestampAttributeKey\" : \"ts\" }'",
-            tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());
+            tableProvider.getTableType(),
+            eventsTopic.topicPath(),
+            payloadFormatParam(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     String queryString =
         "SELECT message.payload.id, attributes[1].key AS a1, attributes[2].key AS a2 FROM message";
 
-    // Prepare messages to send later
-    List<PubsubMessage> messages =
-        ImmutableList.of(
-            objectsProvider.messageIdName(ts(1), 3, "foo"),
-            objectsProvider.messageIdName(ts(2), 5, "bar"),
-            objectsProvider.messageIdName(ts(3), 7, "baz"));
-
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
     sqlEnv.executeDdl(createTableString);
@@ -241,7 +246,11 @@
         pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
 
     // Start publishing the messages when main pipeline is started and signaling topic is ready
-    eventsTopic.publish(messages);
+    eventsTopic.publish(
+        ImmutableList.of(
+            objectsProvider.messageIdName(ts(1), 3, "foo"),
+            objectsProvider.messageIdName(ts(2), 5, "bar"),
+            objectsProvider.messageIdName(ts(3), 7, "baz")));
 
     // Poll the signaling topic for success message
     resultSignal.waitForSuccess(Duration.standardMinutes(1));
@@ -249,6 +258,14 @@
 
   @Test
   public void testSQLWithBytePayload() throws Exception {
+
+    // Prepare messages to send later
+    List<PubsubMessage> messages =
+        ImmutableList.of(
+            objectsProvider.messageIdName(ts(1), 3, "foo"),
+            objectsProvider.messageIdName(ts(2), 5, "bar"),
+            objectsProvider.messageIdName(ts(3), 7, "baz"));
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -259,18 +276,14 @@
                 + "TYPE '%s' \n"
                 + "LOCATION '%s' \n"
                 + "TBLPROPERTIES '{ "
+                + "\"protoClass\" : \"%s\", "
                 + "\"timestampAttributeKey\" : \"ts\" }'",
-            tableProvider.getTableType(), eventsTopic.topicPath());
+            tableProvider.getTableType(),
+            eventsTopic.topicPath(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     String queryString = "SELECT message.payload AS some_bytes FROM message";
 
-    // Prepare messages to send later
-    List<PubsubMessage> messages =
-        ImmutableList.of(
-            objectsProvider.messageIdName(ts(1), 3, "foo"),
-            objectsProvider.messageIdName(ts(2), 5, "bar"),
-            objectsProvider.messageIdName(ts(3), 7, "baz"));
-
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
     sqlEnv.executeDdl(createTableString);
@@ -307,6 +320,7 @@
   @Test
   @SuppressWarnings("unchecked")
   public void testUsesDlq() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -323,24 +337,17 @@
                 + "    '{ "
                 + "       %s"
                 + "       \"timestampAttributeKey\" : \"ts\", "
-                + "       \"deadLetterQueue\" : \"%s\""
+                + "       \"deadLetterQueue\" : \"%s\", "
+                + "       \"protoClass\" : \"%s\" "
                 + "     }'",
             tableProvider.getTableType(),
             eventsTopic.topicPath(),
             payloadFormatParam(),
-            dlqTopic.topicPath());
+            dlqTopic.topicPath(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     String queryString = "SELECT message.payload.id, message.payload.name from message";
 
-    // Prepare messages to send later
-    List<PubsubMessage> messages =
-        ImmutableList.of(
-            objectsProvider.messageIdName(ts(1), 3, "foo"),
-            objectsProvider.messageIdName(ts(2), 5, "bar"),
-            objectsProvider.messageIdName(ts(3), 7, "baz"),
-            messagePayload(ts(4), "{ - }", ImmutableMap.of()), // invalid message, will go to DLQ
-            messagePayload(ts(5), "{ + }", ImmutableMap.of())); // invalid message, will go to DLQ
-
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
     sqlEnv.executeDdl(createTableString);
@@ -368,7 +375,13 @@
         pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
 
     // Start publishing the messages when main pipeline is started and signaling topics are ready
-    eventsTopic.publish(messages);
+    eventsTopic.publish(
+        ImmutableList.of(
+            objectsProvider.messageIdName(ts(1), 3, "foo"),
+            objectsProvider.messageIdName(ts(2), 5, "bar"),
+            objectsProvider.messageIdName(ts(3), 7, "baz"),
+            messagePayload(ts(4), "{ - }", ImmutableMap.of()), // invalid message, will go to DLQ
+            messagePayload(ts(5), "{ + }", ImmutableMap.of()))); // invalid message, will go to DLQ
 
     // Poll the signaling topic for success message
     resultSignal.waitForSuccess(Duration.standardMinutes(4));
@@ -381,6 +394,7 @@
   @Test
   @SuppressWarnings({"unchecked", "rawtypes"})
   public void testSQLLimit() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -397,12 +411,14 @@
                 + "    '{ "
                 + "       %s"
                 + "       \"timestampAttributeKey\" : \"ts\", "
-                + "       \"deadLetterQueue\" : \"%s\""
+                + "       \"deadLetterQueue\" : \"%s\", "
+                + "       \"protoClass\" : \"%s\" "
                 + "     }'",
             tableProvider.getTableType(),
             eventsTopic.topicPath(),
             payloadFormatParam(),
-            dlqTopic.topicPath());
+            dlqTopic.topicPath(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     List<PubsubMessage> messages =
         ImmutableList.of(
@@ -441,13 +457,16 @@
 
     eventsTopic.assertSubscriptionEventuallyCreated(
         pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
+
     eventsTopic.publish(messages);
+
     assertThat(queryResult.get(2, TimeUnit.MINUTES).size(), equalTo(3));
     pool.shutdown();
   }
 
   @Test
   public void testSQLSelectsPayloadContentFlat() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
@@ -460,19 +479,16 @@
                 + "TBLPROPERTIES "
                 + "    '{ "
                 + "       %s"
+                + "       \"protoClass\" : \"%s\", "
                 + "       \"timestampAttributeKey\" : \"ts\" "
                 + "     }'",
-            tableProvider.getTableType(), eventsTopic.topicPath(), payloadFormatParam());
+            tableProvider.getTableType(),
+            eventsTopic.topicPath(),
+            payloadFormatParam(),
+            PayloadMessages.SimpleMessage.class.getName());
 
     String queryString = "SELECT message.id, message.name from message";
 
-    // Prepare messages to send later
-    List<PubsubMessage> messages =
-        ImmutableList.of(
-            objectsProvider.messageIdName(ts(1), 3, "foo"),
-            objectsProvider.messageIdName(ts(2), 5, "bar"),
-            objectsProvider.messageIdName(ts(3), 7, "baz"));
-
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
     sqlEnv.executeDdl(createTableString);
@@ -500,7 +516,11 @@
         pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5));
 
     // Start publishing the messages when main pipeline is started and signaling topic is ready
-    eventsTopic.publish(messages);
+    eventsTopic.publish(
+        ImmutableList.of(
+            objectsProvider.messageIdName(ts(1), 3, "foo"),
+            objectsProvider.messageIdName(ts(2), 5, "bar"),
+            objectsProvider.messageIdName(ts(3), 7, "baz")));
 
     // Poll the signaling topic for success message
     resultSignal.waitForSuccess(Duration.standardMinutes(5));
@@ -509,24 +529,27 @@
   @Test
   @SuppressWarnings("unchecked")
   public void testSQLInsertRowsToPubsubFlat() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
                 + "event_timestamp TIMESTAMP, \n"
                 + "name VARCHAR, \n"
                 + "height INTEGER, \n"
-                + "knowsJavascript BOOLEAN \n"
+                + "knows_javascript BOOLEAN \n"
                 + ") \n"
                 + "TYPE '%s' \n"
                 + "LOCATION '%s' \n"
                 + "TBLPROPERTIES "
                 + "    '{ "
                 + "       %s"
+                + "       \"protoClass\" : \"%s\", "
                 + "       \"deadLetterQueue\" : \"%s\""
                 + "     }'",
             tableProvider.getTableType(),
             eventsTopic.topicPath(),
             payloadFormatParam(),
+            PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
             dlqTopic.topicPath());
 
     // Initialize SQL environment and create the pubsub table
@@ -536,7 +559,7 @@
     // TODO(BEAM-8741): Ideally we could write this query without specifying a column list, because
     //   it shouldn't be possible to write to event_timestamp when it's mapped to  publish time.
     String queryString =
-        "INSERT INTO message (name, height, knowsJavascript) \n"
+        "INSERT INTO message (name, height, knows_javascript) \n"
             + "VALUES \n"
             + "('person1', 80, TRUE), \n"
             + "('person2', 70, FALSE)";
@@ -556,25 +579,28 @@
   @Test
   @SuppressWarnings("unchecked")
   public void testSQLInsertRowsToPubsubWithTimestampAttributeFlat() throws Exception {
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE message (\n"
                 + "  event_timestamp TIMESTAMP, \n"
                 + "  name VARCHAR, \n"
                 + "  height INTEGER, \n"
-                + "  knowsJavascript BOOLEAN \n"
+                + "  knows_javascript BOOLEAN \n"
                 + ") \n"
                 + "TYPE '%s' \n"
                 + "LOCATION '%s' \n"
                 + "TBLPROPERTIES "
                 + "  '{ "
                 + "     %s "
+                + "     \"protoClass\" : \"%s\", "
                 + "     \"deadLetterQueue\" : \"%s\","
                 + "     \"timestampAttributeKey\" : \"ts\""
                 + "   }'",
             tableProvider.getTableType(),
             eventsTopic.topicPath(),
             payloadFormatParam(),
+            PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
             dlqTopic.topicPath());
 
     // Initialize SQL environment and create the pubsub table
@@ -610,20 +636,31 @@
         objectsProvider.getPayloadFormat() == null
             ? ""
             : String.format(
-                "TBLPROPERTIES '{\"format\": \"%s\"}'", objectsProvider.getPayloadFormat());
+                "TBLPROPERTIES '{ \"protoClass\" : \"%s\", \"format\": \"%s\" }'",
+                PayloadMessages.NameHeightKnowsJSMessage.class.getName(),
+                objectsProvider.getPayloadFormat());
+
     String createTableString =
         String.format(
             "CREATE EXTERNAL TABLE people (\n"
                 + "event_timestamp TIMESTAMP, \n"
                 + "name VARCHAR, \n"
                 + "height INTEGER, \n"
-                + "knowsJavascript BOOLEAN \n"
+                + "knows_javascript BOOLEAN \n"
                 + ") \n"
                 + "TYPE '%s' \n"
                 + "LOCATION '%s' \n"
                 + "%s",
             tableProvider.getTableType(), eventsTopic.topicPath(), tblProperties);
 
+    String filteredTblProperties =
+        objectsProvider.getPayloadFormat() == null
+            ? ""
+            : String.format(
+                "TBLPROPERTIES '{ \"protoClass\" : \"%s\", \"format\": \"%s\" }'",
+                PayloadMessages.NameHeightMessage.class.getName(),
+                objectsProvider.getPayloadFormat());
+
     String createFilteredTableString =
         String.format(
             "CREATE EXTERNAL TABLE javascript_people (\n"
@@ -634,7 +671,7 @@
                 + "TYPE '%s' \n"
                 + "LOCATION '%s' \n"
                 + "%s",
-            tableProvider.getTableType(), filteredEventsTopic.topicPath(), tblProperties);
+            tableProvider.getTableType(), filteredEventsTopic.topicPath(), filteredTblProperties);
 
     // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubTableProvider());
@@ -650,11 +687,11 @@
             + "    name, \n"
             + "    height \n"
             + "  FROM people \n"
-            + "  WHERE knowsJavascript \n"
+            + "  WHERE knows_javascript \n"
             + ")";
 
     String injectQueryString =
-        "INSERT INTO people (name, height, knowsJavascript) VALUES \n"
+        "INSERT INTO people (name, height, knows_javascript) VALUES \n"
             + "('person1', 80, TRUE),  \n"
             + "('person2', 70, FALSE), \n"
             + "('person3', 60, TRUE),  \n"
@@ -692,6 +729,17 @@
     Map<String, String> argsMap =
         ((Map<String, Object>) MAPPER.convertValue(pipeline.getOptions(), Map.class).get("options"))
             .entrySet().stream()
+                .filter(
+                    (entry) -> {
+                      if (entry.getValue() instanceof List) {
+                        if (!((List) entry.getValue()).isEmpty()) {
+                          throw new IllegalArgumentException("Cannot encode list arguments");
+                        }
+                        // We can encode empty lists, just omit them.
+                        return false;
+                      }
+                      return true;
+                    })
                 .collect(Collectors.toMap(Map.Entry::getKey, entry -> toArg(entry.getValue())));
 
     InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
@@ -781,6 +829,57 @@
         throws Exception;
   }
 
+  private static class PubsubProtoObjectProvider extends PubsubObjectProvider {
+
+    @Override
+    protected String getPayloadFormat() {
+      return "proto";
+    }
+
+    @Override
+    protected PubsubMessage messageIdName(Instant timestamp, int id, String name) {
+
+      PayloadMessages.SimpleMessage.Builder simpleMessage =
+          PayloadMessages.SimpleMessage.newBuilder().setId(id).setName(name);
+
+      return PubsubTableProviderIT.message(
+          timestamp,
+          simpleMessage.build().toByteArray(),
+          ImmutableMap.of(name, Integer.toString(id)));
+    }
+
+    @Override
+    protected Matcher<PubsubMessage> matcherNames(String name) throws IOException {
+
+      PayloadMessages.NameMessage.Builder nameMessage =
+          PayloadMessages.NameMessage.newBuilder().setName(name);
+
+      return hasProperty("payload", equalTo(nameMessage.build().toByteArray()));
+    }
+
+    @Override
+    protected Matcher<PubsubMessage> matcherNameHeightKnowsJS(
+        String name, int height, boolean knowsJS) throws IOException {
+
+      PayloadMessages.NameHeightKnowsJSMessage.Builder nameHeightKnowsJSMessage =
+          PayloadMessages.NameHeightKnowsJSMessage.newBuilder()
+              .setHeight(height)
+              .setName(name)
+              .setKnowsJavascript(knowsJS);
+
+      return hasProperty("payload", equalTo(nameHeightKnowsJSMessage.build().toByteArray()));
+    }
+
+    @Override
+    protected Matcher<PubsubMessage> matcherNameHeight(String name, int height) throws IOException {
+
+      PayloadMessages.NameHeightMessage.Builder nameHeightMessage =
+          PayloadMessages.NameHeightMessage.newBuilder().setName(name).setHeight(height);
+
+      return hasProperty("payload", equalTo(nameHeightMessage.build().toByteArray()));
+    }
+  }
+
   private static class PubsubJsonObjectProvider extends PubsubObjectProvider {
 
     // Pubsub table provider should default to json
@@ -805,7 +904,7 @@
         String name, int height, boolean knowsJS) throws IOException {
       String jsonString =
           String.format(
-              "{\"name\":\"%s\", \"height\": %s, \"knowsJavascript\": %s}", name, height, knowsJS);
+              "{\"name\":\"%s\", \"height\": %s, \"knows_javascript\": %s}", name, height, knowsJS);
 
       return hasProperty("payload", toJsonByteLike(jsonString));
     }
@@ -831,7 +930,7 @@
         Schema.builder()
             .addNullableField("name", Schema.FieldType.STRING)
             .addNullableField("height", Schema.FieldType.INT32)
-            .addNullableField("knowsJavascript", Schema.FieldType.BOOLEAN)
+            .addNullableField("knows_javascript", Schema.FieldType.BOOLEAN)
             .build();
 
     private static final Schema NAME_HEIGHT_SCHEMA =
diff --git a/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/UdfTestProvider.java b/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/UdfTestProvider.java
index 1d615a9..c0da4aa 100644
--- a/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/UdfTestProvider.java
+++ b/sdks/java/extensions/sql/udf-test-provider/src/main/java/org/apache/beam/sdk/extensions/sql/provider/UdfTestProvider.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.provider;
 
 import com.google.auto.service.AutoService;
+import java.sql.Date;
 import java.util.Map;
 import org.apache.beam.sdk.extensions.sql.udf.AggregateFn;
 import org.apache.beam.sdk.extensions.sql.udf.ScalarFn;
@@ -37,7 +38,9 @@
         "increment",
         new IncrementFn(),
         "isNull",
-        new IsNullFn());
+        new IsNullFn(),
+        "dateIncrementAll",
+        new DateIncrementAllFn());
   }
 
   @Override
@@ -105,4 +108,11 @@
       return mutableAccumulator;
     }
   }
+
+  public static class DateIncrementAllFn extends ScalarFn {
+    @ApplyMethod
+    public Date incrementAll(Date date) {
+      return new Date(date.getYear() + 1, date.getMonth() + 1, date.getDate() + 1);
+    }
+  }
 }
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamCalcRelType.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamCalcRelType.java
index 9c4f396..dd1b56c 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamCalcRelType.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamCalcRelType.java
@@ -133,21 +133,21 @@
   private boolean supportsType(RelDataType type) {
     switch (type.getSqlTypeName()) {
       case BIGINT:
+      case BINARY:
       case BOOLEAN:
+      case CHAR:
+      case DATE:
       case DECIMAL:
       case DOUBLE:
+      case NULL:
       case TIMESTAMP:
       case VARBINARY:
       case VARCHAR:
-      case CHAR:
-      case BINARY:
-      case NULL:
         return true;
       case ARRAY:
         return supportsType(type.getComponentType());
       case ROW:
         return type.getFieldList().stream().allMatch((field) -> supportsType(field.getType()));
-      case DATE: // BEAM-11990
       case TIME: // BEAM-12086
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE: // BEAM-12087
       default:
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
index 8a8b47e..9299473 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
@@ -192,18 +192,21 @@
   /**
    * Throws {@link UnsupportedOperationException} if ZetaSQL type is not supported in Java UDF.
    * Supported types are a subset of the types supported by {@link BeamJavaUdfCalcRule}.
+   *
+   * <p>Supported types should be kept in sync with {@link
+   * #validateJavaUdfCalciteType(RelDataType)}.
    */
   void validateJavaUdfZetaSqlType(Type type) {
     switch (type.getKind()) {
-      case TYPE_INT64:
-      case TYPE_DOUBLE:
       case TYPE_BOOL:
-      case TYPE_STRING:
       case TYPE_BYTES:
+      case TYPE_DATE:
+      case TYPE_DOUBLE:
+      case TYPE_INT64:
+      case TYPE_STRING:
         // These types are supported.
         break;
       case TYPE_NUMERIC:
-      case TYPE_DATE:
       case TYPE_TIME:
       case TYPE_DATETIME:
       case TYPE_TIMESTAMP:
@@ -422,10 +425,13 @@
    * Throws {@link UnsupportedOperationException} if Calcite type is not supported in Java UDF.
    * Supported types are a subset of the corresponding Calcite types supported by {@link
    * BeamJavaUdfCalcRule}.
+   *
+   * <p>Supported types should be kept in sync with {@link #validateJavaUdfZetaSqlType(Type)}.
    */
   private void validateJavaUdfCalciteType(RelDataType type) {
     switch (type.getSqlTypeName()) {
       case BIGINT:
+      case DATE:
       case DOUBLE:
       case BOOLEAN:
       case VARCHAR:
@@ -433,7 +439,6 @@
         // These types are supported.
         break;
       case DECIMAL:
-      case DATE:
       case TIME:
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
       case TIMESTAMP:
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
index d6d459c..ee4e9ef 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
@@ -25,6 +25,7 @@
 
 import com.google.zetasql.SqlException;
 import java.lang.reflect.Method;
+import java.time.LocalDate;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
 import org.apache.beam.sdk.extensions.sql.SqlTransform;
@@ -36,6 +37,7 @@
 import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Sum;
@@ -434,4 +436,22 @@
     PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(6L).build());
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
+
+  @Test
+  public void testDateUdf() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION dateIncrementAll(d DATE) RETURNS DATE LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT dateIncrementAll('2020-04-04');",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    Schema singleField = Schema.builder().addLogicalTypeField("field1", SqlTypes.DATE).build();
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(singleField).addValues(LocalDate.of(2021, 5, 5)).build());
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
 }
diff --git a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
index 498e1b8..98fb25f 100644
--- a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
+++ b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTypeTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.extensions.sql.zetasql;
 
+import java.sql.Date;
+import java.time.LocalDate;
 import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
@@ -27,6 +29,7 @@
 import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
@@ -73,6 +76,7 @@
                   .addDoubleField("float64_inf")
                   .addDoubleField("float64_neg_inf")
                   .addDoubleField("float64_nan")
+                  .addLogicalTypeField("f_date", SqlTypes.DATE)
                   .build())
           .addRows(
               true /* boolean_true */,
@@ -96,7 +100,8 @@
               2.2250738585072014e-308 /* float64_min_pos */,
               Double.POSITIVE_INFINITY /* float64_inf */,
               Double.NEGATIVE_INFINITY /* float64_neg_inf */,
-              Double.NaN /* float64_nan */);
+              Double.NaN /* float64_nan */,
+              LocalDate.of(2021, 4, 26) /* f_date */);
 
   @Before
   public void setUp() throws NoSuchMethodException {
@@ -125,6 +130,8 @@
     schema.add(
         "test_float64",
         ScalarFunctionImpl.create(DoubleIdentityFn.class.getMethod("eval", Double.class)));
+    schema.add(
+        "test_date", ScalarFunctionImpl.create(DateIdentityFn.class.getMethod("eval", Date.class)));
 
     this.config = Frameworks.newConfigBuilder(config).defaultSchema(schema).build();
   }
@@ -159,12 +166,26 @@
     }
   }
 
+  public static class DateIdentityFn implements BeamSqlUdf {
+    public Date eval(Date input) {
+      return input;
+    }
+  }
+
   private void runUdfTypeTest(String query, Object result, Schema.TypeName typeName) {
+    runUdfTypeTest(query, result, Schema.FieldType.of(typeName));
+  }
+
+  private void runUdfTypeTest(String query, Object result, Schema.LogicalType<?, ?> logicalType) {
+    runUdfTypeTest(query, result, Schema.FieldType.logicalType(logicalType));
+  }
+
+  private void runUdfTypeTest(String query, Object result, Schema.FieldType fieldType) {
     ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
     BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(query);
     PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
 
-    Schema outputSchema = Schema.builder().addField("res", Schema.FieldType.of(typeName)).build();
+    Schema outputSchema = Schema.builder().addField("res", fieldType).build();
     PAssert.that(stream).containsInAnyOrder(Row.withSchema(outputSchema).addValues(result).build());
     pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
@@ -428,4 +449,16 @@
     runUdfTypeTest(
         "SELECT test_float64(float64_nan) FROM table;", Double.NaN, Schema.TypeName.DOUBLE);
   }
+
+  @Test
+  public void testDateLiteral() {
+    runUdfTypeTest(
+        "SELECT test_date('2021-04-26') FROM table;", LocalDate.of(2021, 4, 26), SqlTypes.DATE);
+  }
+
+  @Test
+  public void testDateInput() {
+    runUdfTypeTest(
+        "SELECT test_date(f_date) FROM table;", LocalDate.of(2021, 4, 26), SqlTypes.DATE);
+  }
 }
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
index 9af7f58..296767b 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistry.java
@@ -48,8 +48,6 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ListMultimap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.CountingOutputStream;
 
 /**
  * The {@code PCollectionConsumerRegistry} is used to maintain a collection of consuming
@@ -216,7 +214,7 @@
     private final String pTransformId;
     private final SimpleExecutionState state;
     private final Counter unboundedElementCountCounter;
-    private final SampleByteSizeDistribution<T> unboundSampledByteSizeDistribution;
+    private final SampleByteSizeDistribution<T> unboundedSampledByteSizeDistribution;
     private final Coder<T> coder;
     private final MetricsContainer metricsContainer;
 
@@ -239,7 +237,7 @@
 
       MonitoringInfoMetricName sampledByteSizeMetricName =
           MonitoringInfoMetricName.named(Urns.SAMPLED_BYTE_SIZE, labels);
-      this.unboundSampledByteSizeDistribution =
+      this.unboundedSampledByteSizeDistribution =
           new SampleByteSizeDistribution<>(
               unboundMetricContainer.getDistribution(sampledByteSizeMetricName));
 
@@ -252,7 +250,7 @@
       // Increment the counter for each window the element occurs in.
       this.unboundedElementCountCounter.inc(input.getWindows().size());
       // TODO(BEAM-11879): Consider updating size per window when we have window optimization.
-      this.unboundSampledByteSizeDistribution.tryUpdate(input.getValue(), this.coder);
+      this.unboundedSampledByteSizeDistribution.tryUpdate(input.getValue(), this.coder);
       // Wrap the consumer with extra logic to set the metric container with the appropriate
       // PTransform context. This ensures that user metrics obtain the pTransform ID when they are
       // created. Also use the ExecutionStateTracker and enter an appropriate state to track the
@@ -262,6 +260,7 @@
           this.delegate.accept(input);
         }
       }
+      this.unboundedSampledByteSizeDistribution.finishLazyUpdate();
     }
   }
 
@@ -321,6 +320,7 @@
             consumerAndMetadata.getConsumer().accept(input);
           }
         }
+        this.unboundedSampledByteSizeDistribution.finishLazyUpdate();
       }
     }
   }
@@ -365,28 +365,33 @@
     }
 
     final Distribution distribution;
+    ByteSizeObserver byteCountObserver;
 
     public SampleByteSizeDistribution(Distribution distribution) {
       this.distribution = distribution;
+      this.byteCountObserver = null;
     }
 
     public void tryUpdate(T value, Coder<T> coder) throws Exception {
       if (shouldSampleElement()) {
         // First try using byte size observer
-        ByteSizeObserver observer = new ByteSizeObserver();
-        coder.registerByteSizeObserver(value, observer);
+        byteCountObserver = new ByteSizeObserver();
+        coder.registerByteSizeObserver(value, byteCountObserver);
 
-        if (!observer.getIsLazy()) {
-          observer.advance();
-          this.distribution.update(observer.observedSize);
-        } else {
-          // TODO(BEAM-11841): Optimize calculation of element size for iterables.
-          // Coder byte size observation is lazy (requires iteration for observation) so fall back
-          // to counting output stream
-          CountingOutputStream os = new CountingOutputStream(ByteStreams.nullOutputStream());
-          coder.encode(value, os);
-          this.distribution.update(os.getCount());
+        if (!byteCountObserver.getIsLazy()) {
+          byteCountObserver.advance();
+          this.distribution.update(byteCountObserver.observedSize);
         }
+      } else {
+        byteCountObserver = null;
+      }
+    }
+
+    public void finishLazyUpdate() {
+      // Advance lazy ElementByteSizeObservers, if any.
+      if (byteCountObserver != null && byteCountObserver.getIsLazy()) {
+        byteCountObserver.advance();
+        this.distribution.update(byteCountObserver.observedSize);
       }
     }
 
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java
index 90baa5e..708ca8b 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/data/PCollectionConsumerRegistryTest.java
@@ -24,6 +24,7 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -32,7 +33,9 @@
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import org.apache.beam.fn.harness.HandlesSplits;
 import org.apache.beam.model.pipeline.v1.MetricsApi.MonitoringInfo;
@@ -44,15 +47,19 @@
 import org.apache.beam.runners.core.metrics.MonitoringInfoConstants.Urns;
 import org.apache.beam.runners.core.metrics.MonitoringInfoMetricName;
 import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterable;
+import org.apache.beam.sdk.util.common.ElementByteSizeObservableIterator;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
+import org.mockito.stubbing.Answer;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
@@ -329,5 +336,108 @@
     verify(consumerA1).trySplit(0.3);
   }
 
+  @Test
+  public void testLazyByteSizeEstimation() throws Exception {
+    final String pCollectionA = "pCollectionA";
+    final String pTransformIdA = "pTransformIdA";
+
+    MetricsContainerStepMap metricsContainerRegistry = new MetricsContainerStepMap();
+    PCollectionConsumerRegistry consumers =
+        new PCollectionConsumerRegistry(
+            metricsContainerRegistry, mock(ExecutionStateTracker.class));
+    FnDataReceiver<WindowedValue<Iterable<String>>> consumerA1 = mock(FnDataReceiver.class);
+
+    consumers.register(
+        pCollectionA, pTransformIdA, consumerA1, IterableCoder.of(StringUtf8Coder.of()));
+
+    FnDataReceiver<WindowedValue<Iterable<String>>> wrapperConsumer =
+        (FnDataReceiver<WindowedValue<Iterable<String>>>)
+            (FnDataReceiver) consumers.getMultiplexingConsumer(pCollectionA);
+    String elementValue = "elem";
+    long elementByteSize = StringUtf8Coder.of().getEncodedElementByteSize(elementValue);
+    WindowedValue<Iterable<String>> element =
+        valueInGlobalWindow(
+            new TestElementByteSizeObservableIterable<>(
+                Arrays.asList(elementValue, elementValue), elementByteSize));
+    int numElements = 10;
+    // Mock doing work on the iterable items
+    doAnswer(
+            (Answer<Void>)
+                invocation -> {
+                  Object[] args = invocation.getArguments();
+                  WindowedValue<Iterable<String>> arg = (WindowedValue<Iterable<String>>) args[0];
+                  Iterator it = arg.getValue().iterator();
+                  while (it.hasNext()) {
+                    it.next();
+                  }
+                  return null;
+                })
+        .when(consumerA1)
+        .accept(element);
+
+    for (int i = 0; i < numElements; i++) {
+      wrapperConsumer.accept(element);
+    }
+
+    // Check that the underlying consumers are each invoked per element.
+    verify(consumerA1, times(numElements)).accept(element);
+    assertThat(consumers.keySet(), contains(pCollectionA));
+
+    List<MonitoringInfo> expected = new ArrayList<>();
+
+    SimpleMonitoringInfoBuilder builder = new SimpleMonitoringInfoBuilder();
+    builder.setUrn(MonitoringInfoConstants.Urns.ELEMENT_COUNT);
+    builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, pCollectionA);
+    builder.setInt64SumValue(numElements);
+    expected.add(builder.build());
+
+    builder = new SimpleMonitoringInfoBuilder();
+    builder.setUrn(Urns.SAMPLED_BYTE_SIZE);
+    builder.setLabel(MonitoringInfoConstants.Labels.PCOLLECTION, pCollectionA);
+    long expectedBytes =
+        (elementByteSize + 1) * 2
+            + 5; // Additional 5 bytes are due to size and hasNext = false (1 byte).
+    builder.setInt64DistributionValue(
+        DistributionData.create(
+            numElements * expectedBytes, numElements, expectedBytes, expectedBytes));
+    expected.add(builder.build());
+    // Clear the timestamp before comparison.
+    Iterable<MonitoringInfo> result =
+        Iterables.filter(
+            metricsContainerRegistry.getMonitoringInfos(),
+            monitoringInfo -> monitoringInfo.containsLabels(Labels.PCOLLECTION));
+
+    assertThat(result, containsInAnyOrder(expected.toArray()));
+  }
+
+  private class TestElementByteSizeObservableIterable<T>
+      extends ElementByteSizeObservableIterable<T, ElementByteSizeObservableIterator<T>> {
+    private List<T> elements;
+    private long elementByteSize;
+
+    public TestElementByteSizeObservableIterable(List<T> elements, long elementByteSize) {
+      this.elements = elements;
+      this.elementByteSize = elementByteSize;
+    }
+
+    @Override
+    protected ElementByteSizeObservableIterator createIterator() {
+      return new ElementByteSizeObservableIterator() {
+        private int index = 0;
+
+        @Override
+        public boolean hasNext() {
+          return index < elements.size();
+        }
+
+        @Override
+        public Object next() {
+          notifyValueReturned(elementByteSize);
+          return elements.get(index++);
+        }
+      };
+    }
+  }
+
   private abstract static class SplittingReceiver<T> implements FnDataReceiver<T>, HandlesSplits {}
 }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
index 4193ba6..fc1fe94 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryQueryHelper.java
@@ -109,7 +109,10 @@
               .getReferencedTables();
       if (referencedTables != null && !referencedTables.isEmpty()) {
         TableReference referencedTable = referencedTables.get(0);
-        effectiveLocation = tableService.getTable(referencedTable).getLocation();
+        effectiveLocation =
+            tableService
+                .getDataset(referencedTable.getProjectId(), referencedTable.getDatasetId())
+                .getLocation();
       }
     }
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
index 5bc73ce..7fd44ce 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java
@@ -28,6 +28,7 @@
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.nio.charset.StandardCharsets;
@@ -58,6 +59,7 @@
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.fs.ResourceIdCoder;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
 import org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HealthcareHttpException;
 import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
 import org.apache.beam.sdk.metrics.Counter;
@@ -65,6 +67,8 @@
 import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupIntoBatches;
@@ -101,19 +105,19 @@
  * <h3>Reading</h3>
  *
  * <p>FHIR resources can be read with {@link FhirIO.Read}, which supports use cases where you have a
- * ${@link PCollection} of message IDs. This is appropriate for reading the Fhir notifications from
+ * ${@link PCollection} of FHIR resource names in the format of projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}. This is appropriate for reading the Fhir notifications from
  * a Pub/Sub subscription with {@link PubsubIO#readStrings()} or in cases where you have a manually
  * prepared list of messages that you need to process (e.g. in a text file read with {@link
  * org.apache.beam.sdk.io.TextIO}*) .
  *
- * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of message ID strings
+ * <p>Fetch Resource contents from Fhir Store based on the {@link PCollection} of FHIR resource name strings
  * {@link FhirIO.Read.Result} where one can call {@link Read.Result#getResources()} to retrieve a
- * {@link PCollection} containing the successfully fetched {@link String}s and/or {@link
+ * {@link PCollection} containing the successfully fetched json resources as {@link String}s and/or {@link
  * FhirIO.Read.Result#getFailedReads()}* to retrieve a {@link PCollection} of {@link
- * HealthcareIOError}* containing the resource ID that could not be fetched and the exception as a
+ * HealthcareIOError}* containing the resources that could not be fetched and the exception as a
  * {@link HealthcareIOError}, this can be used to write to the dead letter storage system of your
  * choosing. This error handling is mainly to transparently surface errors where the upstream {@link
- * PCollection}* contains IDs that are not valid or are not reachable due to permissions issues.
+ * PCollection}* contains FHIR resources that are not valid or are not reachable due to permissions issues.
  *
  * <h3>Writing</h3>
  *
@@ -383,6 +387,16 @@
   }
 
   /**
+   * Patch FHIR resources, @see <a
+   * href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/patch></a>.
+   *
+   * @return the patch
+   */
+  public static PatchResources patchResources() {
+    return new PatchResources();
+  }
+
+  /**
    * Increments success and failure counters for an LRO. To be used after the LRO has completed.
    * This function leverages the fact that the LRO metadata is always of the format: "counter": {
    * "success": "1", "failure": "1" }
@@ -1330,6 +1344,7 @@
 
   /** The type Execute bundles. */
   public static class ExecuteBundles extends PTransform<PCollection<String>, Write.Result> {
+
     private final ValueProvider<String> fhirStore;
 
     /**
@@ -1417,8 +1432,109 @@
     }
   }
 
+  /** The type Patch resources. */
+  public static class PatchResources extends PTransform<PCollection<Input>, Write.Result> {
+
+    private PatchResources() {}
+
+    /** Represents the input parameters for a single FHIR patch request. */
+    @DefaultSchema(AutoValueSchema.class)
+    @AutoValue
+    abstract static class Input implements Serializable {
+      abstract String getResourceName();
+
+      abstract String getPatch();
+
+      abstract @Nullable Map<String, String> getQuery();
+
+      static Builder builder() {
+        return new AutoValue_FhirIO_PatchResources_Input.Builder();
+      }
+
+      @AutoValue.Builder
+      abstract static class Builder {
+        abstract Builder setResourceName(String resourceName);
+
+        abstract Builder setPatch(String patch);
+
+        abstract Builder setQuery(Map<String, String> query);
+
+        abstract Input build();
+      }
+    }
+
+    @Override
+    public FhirIO.Write.Result expand(PCollection<Input> input) {
+      int numShards = 10;
+      int batchSize = 10000;
+      PCollectionTuple bodies =
+          // Shard input into batches to improve worker performance.
+          input
+              .apply(
+                  "Shard input",
+                  WithKeys.of(elm -> ThreadLocalRandom.current().nextInt(0, numShards)))
+              .setCoder(KvCoder.of(TextualIntegerCoder.of(), input.getCoder()))
+              .apply("Assemble batches", GroupIntoBatches.ofSize(batchSize))
+              .setCoder(KvCoder.of(TextualIntegerCoder.of(), IterableCoder.of(input.getCoder())))
+              .apply(
+                  ParDo.of(new PatchResourcesFn())
+                      .withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY)));
+      bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of());
+      bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
+      return Write.Result.in(input.getPipeline(), bodies);
+    }
+
+    /** The type Write Fhir fn. */
+    static class PatchResourcesFn extends DoFn<KV<Integer, Iterable<Input>>, String> {
+
+      private static final Counter PATCH_RESOURCES_ERRORS =
+          Metrics.counter(
+              PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_error_count");
+      private static final Counter PATCH_RESOURCES_SUCCESS =
+          Metrics.counter(
+              PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_success_count");
+      private static final Distribution PATCH_RESOURCES_LATENCY_MS =
+          Metrics.distribution(
+              PatchResourcesFn.class, BASE_METRIC_PREFIX + "patch_resources_latency_ms");
+
+      private transient HealthcareApiClient client;
+      private final ObjectMapper mapper = new ObjectMapper();
+
+      /**
+       * Initialize healthcare client.
+       *
+       * @throws IOException If the Healthcare client cannot be created.
+       */
+      @Setup
+      public void initClient() throws IOException {
+        this.client = new HttpHealthcareApiClient();
+      }
+
+      @ProcessElement
+      public void patchResources(ProcessContext context) {
+        Iterable<Input> batch = context.element().getValue();
+        for (Input patchParameter : batch) {
+          try {
+            long startTime = Instant.now().toEpochMilli();
+            client.patchFhirResource(
+                patchParameter.getResourceName(),
+                patchParameter.getPatch(),
+                patchParameter.getQuery());
+            PATCH_RESOURCES_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime);
+            PATCH_RESOURCES_SUCCESS.inc();
+            context.output(Write.SUCCESSFUL_BODY, patchParameter.toString());
+          } catch (IOException | HealthcareHttpException e) {
+            PATCH_RESOURCES_ERRORS.inc();
+            context.output(Write.FAILED_BODY, HealthcareIOError.of(patchParameter.toString(), e));
+          }
+        }
+      }
+    }
+  }
+
   /** Export FHIR resources from a FHIR store to new line delimited json files on GCS. */
   public static class Export extends PTransform<PBegin, PCollection<String>> {
+
     private final ValueProvider<String> fhirStore;
     private final ValueProvider<String> exportGcsUriPrefix;
 
@@ -1481,6 +1597,7 @@
 
   /** Deidentify FHIR resources from a FHIR store to a destination FHIR store. */
   public static class Deidentify extends PTransform<PBegin, PCollection<String>> {
+
     private final ValueProvider<String> sourceFhirStore;
     private final ValueProvider<String> destinationFhirStore;
     private final ValueProvider<DeidentifyConfig> deidConfig;
@@ -1551,6 +1668,7 @@
   /** The type Search. */
   public static class Search<T>
       extends PTransform<PCollection<FhirSearchParameter<T>>, FhirIO.Search.Result> {
+
     private static final Logger LOG = LoggerFactory.getLogger(Search.class);
 
     private final ValueProvider<String> fhirStore;
@@ -1564,6 +1682,7 @@
     }
 
     public static class Result implements POutput, PInput {
+
       private PCollection<KV<String, JsonArray>> keyedResources;
       private PCollection<JsonArray> resources;
 
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
index 6f80d86..db34d33 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirSearchParameter.java
@@ -46,11 +46,26 @@
     this.queries = queries;
   }
 
+  /**
+   * Creates a FhirSearchParameter to represent a FHIR Search request.
+   *
+   * @param resourceType resource type for search, leave empty for all
+   * @param key optional key to index searches by
+   * @param queries search query, with field as key and search as value
+   * @return FhirSearchParameter
+   */
   public static <T> FhirSearchParameter<T> of(
       String resourceType, @Nullable String key, @Nullable Map<String, T> queries) {
     return new FhirSearchParameter<>(resourceType, key, queries);
   }
 
+  /**
+   * Creates a FhirSearchParameter to represent a FHIR Search request.
+   *
+   * @param resourceType resource type for search, leave empty for all
+   * @param queries search query, with field as key and search as value
+   * @return FhirSearchParameter
+   */
   public static <T> FhirSearchParameter<T> of(
       String resourceType, @Nullable Map<String, T> queries) {
     return new FhirSearchParameter<>(resourceType, null, queries);
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
index f4e38c4..81b2a75 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HealthcareApiClient.java
@@ -162,13 +162,27 @@
       throws IOException, HealthcareHttpException;
 
   /**
+   * Patch fhir resource http body.
+   *
+   * @param resourceName the resource name, in format
+   *     projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}[/{id}], id not
+   *     present when queryString is specified.
+   * @param patch the patch operation
+   * @param query optional query for conditional patches
+   * @return the http body
+   */
+  HttpBody patchFhirResource(String resourceName, String patch, @Nullable Map<String, String> query)
+      throws IOException, HealthcareHttpException;
+
+  /**
    * Read fhir resource http body.
    *
-   * @param resourceId the resource
+   * @param resourceName the resource name, in format
+   *     projects/{p}/locations/{l}/datasets/{d}/fhirStores/{f}/fhir/{resourceType}/{id}
    * @return the http body
    * @throws IOException the io exception
    */
-  HttpBody readFhirResource(String resourceId) throws IOException;
+  HttpBody readFhirResource(String resourceName) throws IOException;
 
   /**
    * Search fhir resource http body.
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
index 283d011..929fe2a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
@@ -100,6 +100,7 @@
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class HttpHealthcareApiClient implements HealthcareApiClient, Serializable {
+
   private static final String USER_AGENT =
       String.format(
           "apache-beam-io-google-cloud-platform-healthcare/%s",
@@ -107,6 +108,7 @@
   private static final String FHIRSTORE_HEADER_CONTENT_TYPE = "application/fhir+json";
   private static final String FHIRSTORE_HEADER_ACCEPT = "application/fhir+json; charset=utf-8";
   private static final String FHIRSTORE_HEADER_ACCEPT_CHARSET = "utf-8";
+  private static final String FHIRSTORE_PATCH_CONTENT_TYPE = "application/json-patch+json";
   private static final Logger LOG = LoggerFactory.getLogger(HttpHealthcareApiClient.class);
   private transient CloudHealthcare client;
   private transient HttpClient httpClient;
@@ -594,11 +596,61 @@
     return responseModel;
   }
 
+  @Override
+  public HttpBody patchFhirResource(
+      String resourceName, String patch, @Nullable Map<String, String> query)
+      throws IOException, HealthcareHttpException {
+    if (httpClient == null || client == null) {
+      initClient();
+    }
+
+    credentials.refreshIfExpired();
+    StringEntity requestEntity = new StringEntity(patch, ContentType.APPLICATION_JSON);
+    URI uri;
+    try {
+      URIBuilder uriBuilder = new URIBuilder(client.getRootUrl() + "v1beta1/" + resourceName);
+      if (query != null) {
+        for (Map.Entry<String, String> q : query.entrySet()) {
+          uriBuilder.addParameter(q.getKey(), q.getValue());
+        }
+      }
+      uri = uriBuilder.build();
+    } catch (URISyntaxException e) {
+      LOG.error("URL error when making patch request to FHIR API. " + e.getMessage());
+      throw new IllegalArgumentException(e);
+    }
+
+    RequestBuilder requestBuilder =
+        RequestBuilder.patch()
+            .setUri(uri)
+            .setEntity(requestEntity)
+            .addHeader("Authorization", "Bearer " + credentials.getAccessToken().getTokenValue())
+            .addHeader("User-Agent", USER_AGENT)
+            .addHeader("Content-Type", FHIRSTORE_PATCH_CONTENT_TYPE)
+            .addHeader("Accept", FHIRSTORE_HEADER_ACCEPT)
+            .addHeader("Accept-Charset", FHIRSTORE_HEADER_ACCEPT_CHARSET);
+
+    HttpUriRequest request = requestBuilder.build();
+    HttpResponse response = httpClient.execute(request);
+    HttpEntity responseEntity = response.getEntity();
+    String content = EntityUtils.toString(responseEntity);
+
+    // Check 2XX code.
+    int statusCode = response.getStatusLine().getStatusCode();
+    if (!(statusCode / 100 == 2)) {
+      throw HealthcareHttpException.of(statusCode, content);
+    }
+    HttpBody responseModel = new HttpBody();
+    responseModel.setData(content);
+    return responseModel;
+  }
+
   /**
    * Wraps {@link HttpResponse} in an exception with a statusCode field for use with {@link
    * HealthcareIOError}.
    */
   public static class HealthcareHttpException extends Exception {
+
     private final int statusCode;
 
     private HealthcareHttpException(int statusCode, String message) {
@@ -630,8 +682,15 @@
   }
 
   @Override
-  public HttpBody readFhirResource(String resourceId) throws IOException {
-    return client.projects().locations().datasets().fhirStores().fhir().read(resourceId).execute();
+  public HttpBody readFhirResource(String resourceName) throws IOException {
+    return client
+        .projects()
+        .locations()
+        .datasets()
+        .fhirStores()
+        .fhir()
+        .read(resourceName)
+        .execute();
   }
 
   @Override
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
index f0fbab6..daa4877 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
@@ -414,7 +414,7 @@
     DateTime startTime = new DateTime();
     int sizeOfSubscriptionList = 0;
     while (sizeOfSubscriptionList == 0
-        && Seconds.secondsBetween(new DateTime(), startTime).getSeconds()
+        && Seconds.secondsBetween(startTime, new DateTime()).getSeconds()
             < timeoutDuration.toStandardSeconds().getSeconds()) {
       // Sleep 1 sec
       Thread.sleep(1000);
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java
new file mode 100644
index 0000000..835d717
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOPatchIT.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
+
+import com.google.api.services.healthcare.v1beta1.model.HttpBody;
+import java.io.IOException;
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class FhirIOPatchIT {
+
+  public String version;
+
+  @Parameters(name = "{0}")
+  public static Collection<String> versions() {
+    return Arrays.asList("R4");
+  }
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  private final String project;
+  private transient HealthcareApiClient client;
+  private static String healthcareDataset;
+  private String fhirStoreId;
+  private String resourceName;
+  private static final String BASE_STORE_ID =
+      "FHIR_store_patch_it_" + System.currentTimeMillis() + "_" + (new SecureRandom().nextInt(32));
+
+  public FhirIOPatchIT(String version) {
+    this.version = version;
+    this.fhirStoreId = BASE_STORE_ID + version;
+    this.project =
+        TestPipeline.testingPipelineOptions()
+            .as(HealthcareStoreTestPipelineOptions.class)
+            .getStoreProjectId();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
+    if (client == null) {
+      this.client = new HttpHealthcareApiClient();
+    }
+    client.createFhirStore(healthcareDataset, fhirStoreId, version, "");
+
+    resourceName = healthcareDataset + "/fhirStores/" + fhirStoreId + "/fhir/Patient/123";
+    String bundle =
+        "{\"resourceType\":\"Bundle\","
+            + "\"type\":\"transaction\","
+            + "\"entry\": [{"
+            + "\"request\":{\"method\":\"PUT\",\"url\":\"Patient/123\"},"
+            + "\"resource\":{\"resourceType\":\"Patient\",\"id\":\"123\",\"birthDate\": \"1990-01-01\"}"
+            + "}]}";
+    FhirIOTestUtil.executeFhirBundles(
+        client, healthcareDataset + "/fhirStores/" + fhirStoreId, ImmutableList.of(bundle));
+  }
+
+  @After
+  public void teardown() throws IOException {
+    HealthcareApiClient client = new HttpHealthcareApiClient();
+    for (String version : versions()) {
+      client.deleteFhirStore(healthcareDataset + "/fhirStores/" + BASE_STORE_ID + version);
+    }
+  }
+
+  @Test
+  public void testFhirIOPatch() throws IOException {
+    pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
+
+    Input patchParameter =
+        Input.builder()
+            .setResourceName(resourceName)
+            .setPatch(
+                "[{\"op\": \"replace\", \"path\": \"/birthDate\", \"value\": \"1997-05-23\"}]")
+            .build();
+    String expectedSuccessBody = patchParameter.toString();
+
+    // Execute patch.
+    PCollection<Input> patches = pipeline.apply(Create.of(patchParameter));
+    FhirIO.Write.Result result = patches.apply(FhirIO.patchResources());
+
+    // Validate beam results.
+    PAssert.that(result.getFailedBodies()).empty();
+    PCollection<String> successfulBodies = result.getSuccessfulBodies();
+    PAssert.that(successfulBodies)
+        .satisfies(
+            input -> {
+              for (String body : input) {
+                Assert.assertEquals(expectedSuccessBody, body);
+              }
+              return null;
+            });
+
+    pipeline.run().waitUntilFinish();
+
+    // Validate FHIR store contents.
+    HttpBody readResult = client.readFhirResource(resourceName);
+    Assert.assertEquals("1997-05-23", readResult.get("birthDate"));
+  }
+
+  @Test
+  public void testFhirIOPatch_ifMatch() throws IOException {
+    pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
+
+    Input patchParameter =
+        Input.builder()
+            .setResourceName(healthcareDataset + "/fhirStores/" + fhirStoreId + "/fhir/Patient")
+            .setPatch(
+                "[{\"op\": \"replace\", \"path\": \"/birthDate\", \"value\": \"1997-06-23\"}]")
+            .setQuery(ImmutableMap.of("birthDate", "1990-01-01"))
+            .build();
+    String expectedSuccessBody = patchParameter.toString();
+
+    // Execute patch.
+    PCollection<Input> patches = pipeline.apply(Create.of(patchParameter));
+    FhirIO.Write.Result result = patches.apply(FhirIO.patchResources());
+
+    // Validate beam results.
+    PAssert.that(result.getFailedBodies()).empty();
+    PCollection<String> successfulBodies = result.getSuccessfulBodies();
+    PAssert.that(successfulBodies)
+        .satisfies(
+            input -> {
+              for (String body : input) {
+                Assert.assertEquals(expectedSuccessBody, body);
+              }
+              return null;
+            });
+
+    pipeline.run().waitUntilFinish();
+
+    // Validate FHIR store contents.
+    HttpBody readResult = client.readFhirResource(resourceName);
+    Assert.assertEquals("1997-06-23", readResult.get("birthDate"));
+  }
+}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
index 8313a01..eecd371 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIOTest.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.gcp.healthcare.FhirIO.PatchResources.Input;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -31,6 +32,7 @@
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -133,6 +135,27 @@
     pipeline.run();
   }
 
+  @Test
+  public void test_FhirIO_failedPatch() {
+    Input badPatch = Input.builder().setPatch("").setResourceName("").build();
+    PCollection<Input> patches = pipeline.apply(Create.of(ImmutableList.of(badPatch)));
+    FhirIO.Write.Result patchResult = patches.apply(FhirIO.patchResources());
+
+    PCollection<HealthcareIOError<String>> failedInserts = patchResult.getFailedBodies();
+
+    PAssert.thatSingleton(failedInserts)
+        .satisfies(
+            (HealthcareIOError<String> err) -> {
+              Assert.assertEquals(badPatch.toString(), err.getDataResource());
+              return null;
+            });
+    PCollection<Long> numFailedInserts = failedInserts.apply(Count.globally());
+
+    PAssert.thatSingleton(numFailedInserts).isEqualTo(1L);
+
+    pipeline.run();
+  }
+
   private static final long NUM_ELEMENTS = 11;
 
   private static ArrayList<KV<String, String>> createTestData() {
diff --git a/sdks/java/io/hcatalog/build.gradle b/sdks/java/io/hcatalog/build.gradle
index 84743f8c..820418c 100644
--- a/sdks/java/io/hcatalog/build.gradle
+++ b/sdks/java/io/hcatalog/build.gradle
@@ -43,7 +43,7 @@
 
 configurations.testRuntimeClasspath {
   resolutionStrategy {
-    def log4j_version = "2.4.1"
+    def log4j_version = "2.8.2"
     // Beam's build system forces a uniform log4j version resolution for all modules, however for
     // the HCatalog case the current version of log4j produces NoClassDefFoundError so we need to
     // force an old version on the tests runtime classpath
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 29d8bac..499a1f8 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -2197,10 +2197,8 @@
      * transform ties checkpointing semantics in compatible Beam runners and transactions in Kafka
      * (version 0.11+) to ensure a record is written only once. As the implementation relies on
      * runners checkpoint semantics, not all the runners are compatible. The sink throws an
-     * exception during initialization if the runner is not explicitly allowed. Flink runner is one
-     * of the runners whose checkpoint semantics are not compatible with current implementation
-     * (hope to provide a solution in near future). Dataflow runner and Spark runners are
-     * compatible.
+     * exception during initialization if the runner is not explicitly allowed. The Dataflow, Flink,
+     * and Spark runners are compatible.
      *
      * <p>Note on performance: Exactly-once sink involves two shuffles of the records. In addition
      * to cost of shuffling the records among workers, the records go through 2
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
index ec328d8..0a48131 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
@@ -98,7 +98,7 @@
      * Current backlog, as estimated number of event bytes we are behind, or null if unknown.
      * Reported to callers.
      */
-    private @Nullable Long backlogBytes;
+    private long backlogBytes;
 
     /** Wallclock time (ms since epoch) we last reported the backlog, or -1 if never reported. */
     private long lastReportedBacklogWallclock;
@@ -127,6 +127,7 @@
       lastReportedBacklogWallclock = -1;
       pendingEventWallclockTime = -1;
       timestampAtLastReportedBacklogMs = -1;
+      updateBacklog(System.currentTimeMillis(), 0);
     }
 
     public EventReader(GeneratorConfig config) {
@@ -146,9 +147,7 @@
       while (pendingEvent == null) {
         if (!generator.hasNext() && heldBackEvents.isEmpty()) {
           // No more events, EVER.
-          if (isRateLimited) {
-            updateBacklog(System.currentTimeMillis(), 0);
-          }
+          updateBacklog(System.currentTimeMillis(), 0);
           if (watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
             watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
             LOG.trace("stopped unbounded generator {}", generator);
@@ -177,9 +176,7 @@
           }
         } else {
           // Waiting for held-back event to fire.
-          if (isRateLimited) {
-            updateBacklog(now, 0);
-          }
+          updateBacklog(now, 0);
           return false;
         }
 
@@ -199,6 +196,8 @@
           return false;
         }
         updateBacklog(now, now - pendingEventWallclockTime);
+      } else {
+        updateBacklog(now, 0);
       }
 
       // This event is ready to fire.
@@ -210,20 +209,26 @@
     private void updateBacklog(long now, long newBacklogDurationMs) {
       backlogDurationMs = newBacklogDurationMs;
       long interEventDelayUs = generator.currentInterEventDelayUs();
-      if (interEventDelayUs != 0) {
+      if (isRateLimited && interEventDelayUs > 0) {
         long backlogEvents = (backlogDurationMs * 1000 + interEventDelayUs - 1) / interEventDelayUs;
         backlogBytes = generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents);
+      } else {
+        double fractionRemaining = 1.0 - generator.getFractionConsumed();
+        backlogBytes =
+            Math.max(
+                0L,
+                (long) (generator.getCurrentConfig().getEstimatedSizeBytes() * fractionRemaining));
       }
       if (lastReportedBacklogWallclock < 0
           || now - lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) {
-        double timeDialation = Double.NaN;
+        double timeDilation = Double.NaN;
         if (pendingEvent != null
             && lastReportedBacklogWallclock >= 0
             && timestampAtLastReportedBacklogMs >= 0) {
           long wallclockProgressionMs = now - lastReportedBacklogWallclock;
           long eventTimeProgressionMs =
               pendingEvent.getTimestamp().getMillis() - timestampAtLastReportedBacklogMs;
-          timeDialation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs;
+          timeDilation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs;
         }
         LOG.debug(
             "unbounded generator backlog now {}ms ({} bytes) at {}us interEventDelay "
@@ -231,7 +236,7 @@
             backlogDurationMs,
             backlogBytes,
             interEventDelayUs,
-            timeDialation);
+            timeDilation);
         lastReportedBacklogWallclock = now;
         if (pendingEvent != null) {
           timestampAtLastReportedBacklogMs = pendingEvent.getTimestamp().getMillis();
@@ -277,7 +282,7 @@
 
     @Override
     public long getSplitBacklogBytes() {
-      return backlogBytes == null ? BACKLOG_UNKNOWN : backlogBytes;
+      return backlogBytes;
     }
 
     @Override
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index f615290..498f877 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -329,6 +329,7 @@
 DATACLASS_TYPE = 101
 NAMED_TUPLE_TYPE = 102
 ENUM_TYPE = 103
+NESTED_STATE_TYPE = 104
 
 # Types that can be encoded as iterables, but are not literally
 # lists, etc. due to being lazy.  The actual type is not preserved
@@ -442,24 +443,50 @@
             "for the input of '%s'" %
             (value, type(value), self.requires_deterministic_step_label))
       self.encode_type(type(value), stream)
-      self.iterable_coder_impl.encode_to_stream(
-          [getattr(value, field.name) for field in dataclasses.fields(value)],
-          stream,
-          True)
+      values = [
+          getattr(value, field.name) for field in dataclasses.fields(value)
+      ]
+      try:
+        self.iterable_coder_impl.encode_to_stream(values, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
     elif isinstance(value, tuple) and hasattr(type(value), '_fields'):
       stream.write_byte(NAMED_TUPLE_TYPE)
       self.encode_type(type(value), stream)
-      self.iterable_coder_impl.encode_to_stream(value, stream, True)
+      try:
+        self.iterable_coder_impl.encode_to_stream(value, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
     elif isinstance(value, enum.Enum):
       stream.write_byte(ENUM_TYPE)
       self.encode_type(type(value), stream)
       # Enum values can be of any type.
-      self.encode_to_stream(value.value, stream, True)
+      try:
+        self.encode_to_stream(value.value, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
+    elif hasattr(value, "__getstate__"):
+      if not hasattr(value, "__setstate__"):
+        raise TypeError(
+            "Unable to deterministically encode '%s' of type '%s', "
+            "for the input of '%s'. The object defines __getstate__ but not "
+            "__setstate__." %
+            (value, type(value), self.requires_deterministic_step_label))
+      stream.write_byte(NESTED_STATE_TYPE)
+      self.encode_type(type(value), stream)
+      state_value = value.__getstate__()
+      try:
+        self.encode_to_stream(state_value, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
     else:
-      raise TypeError(
-          "Unable to deterministically encode '%s' of type '%s', "
-          "please provide a type hint for the input of '%s'" %
-          (value, type(value), self.requires_deterministic_step_label))
+      raise TypeError(self._deterministic_encoding_error_msg(value))
+
+  def _deterministic_encoding_error_msg(self, value):
+    return (
+        "Unable to deterministically encode '%s' of type '%s', "
+        "please provide a type hint for the input of '%s'" %
+        (value, type(value), self.requires_deterministic_step_label))
 
   def encode_type(self, t, stream):
     stream.write(dill.dumps(t), True)
@@ -510,6 +537,12 @@
     elif t == ENUM_TYPE:
       cls = self.decode_type(stream)
       return cls(self.decode_from_stream(stream, True))
+    elif t == NESTED_STATE_TYPE:
+      cls = self.decode_type(stream)
+      state = self.decode_from_stream(stream, True)
+      value = cls.__new__(cls)
+      value.__setstate__(state)
+      return value
     elif t == UNKNOWN_TYPE:
       return self.fallback_coder_impl.decode_from_stream(stream, nested)
     else:
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 17825a9..0a18e27 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -69,6 +69,22 @@
 MyFlag = enum.Flag('MyFlag', 'F1 F2 F3')  # pylint: disable=too-many-function-args
 
 
+class DefinesGetState:
+  def __init__(self, value):
+    self.value = value
+
+  def __getstate__(self):
+    return self.value
+
+  def __eq__(self, other):
+    return type(other) is type(self) and other.value == self.value
+
+
+class DefinesGetAndSetState(DefinesGetState):
+  def __setstate__(self, value):
+    self.value = value
+
+
 # Defined out of line for picklability.
 class CustomCoder(coders.Coder):
   def encode(self, x):
@@ -236,6 +252,15 @@
     self.check_coder(deterministic_coder, list(MyIntFlag))
     self.check_coder(deterministic_coder, list(MyFlag))
 
+    self.check_coder(
+        deterministic_coder,
+        [DefinesGetAndSetState(1), DefinesGetAndSetState((1, 2, 3))])
+
+    with self.assertRaises(TypeError):
+      self.check_coder(deterministic_coder, DefinesGetState(1))
+    with self.assertRaises(TypeError):
+      self.check_coder(deterministic_coder, DefinesGetAndSetState(dict()))
+
   def test_dill_coder(self):
     cell_value = (lambda x: lambda: x)(0).__closure__[0]
     self.check_coder(coders.DillCoder(), 'a', 1, cell_value)
diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py
index 80ee149..54a15c5 100644
--- a/sdks/python/apache_beam/dataframe/expressions.py
+++ b/sdks/python/apache_beam/dataframe/expressions.py
@@ -348,10 +348,16 @@
       preserves_partition_by: The level of partitioning preserved.
     """
     if (not _get_allow_non_parallel() and
-        requires_partition_by == partitionings.Singleton()):
+        isinstance(requires_partition_by, partitionings.Singleton)):
+      reason = requires_partition_by.reason or (
+          f"Encountered non-parallelizable form of {name!r}.")
+
       raise NonParallelOperation(
-          "Using non-parallel form of %s "
-          "outside of allow_non_parallel_operations block." % name)
+          f"{reason}\n"
+          "Consider using an allow_non_parallel_operations block if you're "
+          "sure you want to do this. See "
+          "https://s.apache.org/dataframe-non-parallel-operations for more "
+          "information.")
     args = tuple(args)
     if proxy is None:
       proxy = func(*(arg.proxy() for arg in args))
@@ -406,4 +412,6 @@
 
 
 class NonParallelOperation(Exception):
-  pass
+  def __init__(self, msg):
+    super(NonParallelOperation, self).__init__(self, msg)
+    self.msg = msg
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 1f892f4..25355fe 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -109,7 +109,11 @@
     if index is not None and errors == 'raise':
       # In order to raise an error about missing index values, we'll
       # need to collect the entire dataframe.
-      requires = partitionings.Singleton()
+      requires = partitionings.Singleton(
+          reason=(
+              "drop(errors='raise', axis='index') is not currently "
+              "parallelizable. This requires collecting all data on a single "
+              f"node in order to detect if one of {index!r} is missing."))
     else:
       requires = partitionings.Arbitrary()
 
@@ -142,24 +146,26 @@
   def fillna(self, value, method, axis, limit, **kwargs):
     # Default value is None, but is overriden with index.
     axis = axis or 'index'
-    if method is not None and axis in (0, 'index'):
-      raise frame_base.WontImplementError(
-          f"fillna(method={method!r}) is not supported because it is "
-          "order-sensitive. Only fillna(method=None) is supported.",
-          reason="order-sensitive")
+
+    if axis in (0, 'index'):
+      if method is not None:
+        raise frame_base.WontImplementError(
+            f"fillna(method={method!r}, axis={axis!r}) is not supported "
+            "because it is order-sensitive. Only fillna(method=None) is "
+            f"supported with axis={axis!r}.",
+            reason="order-sensitive")
+      if limit is not None:
+        raise frame_base.WontImplementError(
+            f"fillna(limit={method!r}, axis={axis!r}) is not supported because "
+            "it is order-sensitive. Only fillna(limit=None) is supported with "
+            f"axis={axis!r}.",
+            reason="order-sensitive")
+
     if isinstance(value, frame_base.DeferredBase):
       value_expr = value._expr
     else:
       value_expr = expressions.ConstantExpression(value)
 
-    if limit is not None and method is None:
-      # If method is not None (and axis is 'columns'), we can do limit in
-      # a distributed way. Otherwise the limit is global, so it requires
-      # Singleton partitioning.
-      requires = partitionings.Singleton()
-    else:
-      requires = partitionings.Arbitrary()
-
     return frame_base.DeferredFrame.wrap(
         # yapf: disable
         expressions.ComputedExpression(
@@ -169,7 +175,7 @@
                 value, method=method, axis=axis, limit=limit, **kwargs),
             [self._expr, value_expr],
             preserves_partition_by=partitionings.Arbitrary(),
-            requires_partition_by=requires))
+            requires_partition_by=partitionings.Arbitrary()))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
@@ -523,7 +529,11 @@
     if errors == "ignore":
       # We need all data in order to ignore errors and propagate the original
       # data.
-      requires = partitionings.Singleton()
+      requires = partitionings.Singleton(
+          reason=(
+              f"where(errors={errors!r}) is currently not parallelizable, "
+              "because all data must be collected on one node to determine if "
+              "the original data should be propagated instead."))
 
     actual_args['errors'] = errors
 
@@ -668,10 +678,8 @@
           reason="order-sensitive")
 
     if verify_integrity:
-      # verifying output has a unique index requires global index.
-      # TODO(BEAM-11839): Attach an explanation to the Singleton partitioning
-      # requirement, and include it in raised errors.
-      requires = partitionings.Singleton()
+      # We can verify the index is non-unique within index partitioned data.
+      requires = partitionings.Index()
     else:
       requires = partitionings.Arbitrary()
 
@@ -750,7 +758,12 @@
       right = other._expr
       right_is_series = False
     else:
-      raise frame_base.WontImplementError('non-deferred result')
+      raise frame_base.WontImplementError(
+          "other must be a DeferredDataFrame or DeferredSeries instance. "
+          "Passing a concrete list or numpy array is not supported. Those "
+          "types have no index and must be joined based on the order of the "
+          "data.",
+          reason="order-sensitive")
 
     dots = expressions.ComputedExpression(
         'dot',
@@ -838,6 +851,10 @@
       return x._corr_aligned(y, min_periods)
 
     else:
+      reason = (
+          f"Encountered corr(method={method!r}) which cannot be "
+          "parallelized. Only corr(method='pearson') is currently "
+          "parallelizable.")
       # The rank-based correlations are not obviously parallelizable, though
       # perhaps an approximation could be done with a knowledge of quantiles
       # and custom partitioning.
@@ -847,9 +864,7 @@
               lambda df,
               other: df.corr(other, method=method, min_periods=min_periods),
               [self._expr, other._expr],
-              # TODO(BEAM-11839): Attach an explanation to the Singleton
-              # partitioning requirement, and include it in raised errors.
-              requires_partition_by=partitionings.Singleton()))
+              requires_partition_by=partitionings.Singleton(reason=reason)))
 
   def _corr_aligned(self, other, min_periods):
     std_x = self.std()
@@ -958,9 +973,16 @@
         return frame_base.DeferredFrame.wrap(
             expressions.ComputedExpression(
                 'aggregate',
-                lambda s: s.agg(func, *args, **kwargs), [intermediate],
+                lambda s: s.agg(func, *args, **kwargs),
+                [intermediate],
                 preserves_partition_by=partitionings.Arbitrary(),
-                requires_partition_by=partitionings.Singleton()))
+                # TODO(BEAM-11839): This reason should be more specific. It's
+                # actually incorrect for the args/kwargs case above.
+                requires_partition_by=partitionings.Singleton(
+                    reason=(
+                        f"Aggregation function {func!r} cannot currently be "
+                        "parallelized, it requires collecting all data for "
+                        "this Series on a single node."))))
 
   agg = aggregate
 
@@ -1119,7 +1141,10 @@
     if limit is None:
       requires_partition_by = partitionings.Arbitrary()
     else:
-      requires_partition_by = partitionings.Singleton()
+      requires_partition_by = partitionings.Singleton(
+          reason=(
+              f"replace(limit={limit!r}) cannot currently be parallelized, it "
+              "requires collecting all data on a single node."))
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'replace',
@@ -1154,7 +1179,8 @@
             'unique',
             lambda df: pd.Series(df.unique()), [self._expr],
             preserves_partition_by=partitionings.Singleton(),
-            requires_partition_by=partitionings.Singleton()))
+            requires_partition_by=partitionings.Singleton(
+                reason="unique() cannot currently be parallelized.")))
 
   def update(self, other):
     self._expr = expressions.ComputedExpression(
@@ -1242,7 +1268,8 @@
       elif _is_integer_slice(key):
         # This depends on the contents of the index.
         raise frame_base.WontImplementError(
-            'Use iloc or loc with integer slices.')
+            "Integer slices are not supported as they are ambiguous. Please "
+            "use iloc or loc with integer slices.")
       else:
         return self.loc[key]
 
@@ -1278,7 +1305,10 @@
   @frame_base.populate_defaults(pd.DataFrame)
   def align(self, other, join, axis, copy, level, method, **kwargs):
     if not copy:
-      raise frame_base.WontImplementError('align(copy=False)')
+      raise frame_base.WontImplementError(
+          "align(copy=False) is not supported because it might be an inplace "
+          "operation depending on the data. Please prefer the default "
+          "align(copy=True).")
     if method is not None:
       raise frame_base.WontImplementError(
           f"align(method={method!r}) is not supported because it is "
@@ -1289,7 +1319,9 @@
 
     if level is not None:
       # Could probably get by partitioning on the used levels.
-      requires_partition_by = partitionings.Singleton()
+      requires_partition_by = partitionings.Singleton(reason=(
+          f"align(level={level}) is not currently parallelizable. Only "
+          "align(level=None) can be parallelized."))
     elif axis in ('columns', 1):
       requires_partition_by = partitionings.Arbitrary()
     else:
@@ -1314,16 +1346,21 @@
           "append(ignore_index=True) is order sensitive because it requires "
           "generating a new index based on the order of the data.",
           reason="order-sensitive")
+
     if verify_integrity:
-      raise frame_base.WontImplementError(
-          "append(verify_integrity=True) produces an execution time error")
+      # We can verify the index is non-unique within index partitioned data.
+      requires = partitionings.Index()
+    else:
+      requires = partitionings.Arbitrary()
 
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'append',
-            lambda s, other: s.append(other, sort=sort, **kwargs),
+            lambda s, other: s.append(other, sort=sort,
+                                      verify_integrity=verify_integrity,
+                                      **kwargs),
             [self._expr, other._expr],
-            requires_partition_by=partitionings.Arbitrary(),
+            requires_partition_by=requires,
             preserves_partition_by=partitionings.Arbitrary()
         )
     )
@@ -1391,8 +1428,6 @@
             preserves_partition_by=preserves,
             requires_partition_by=partitionings.Arbitrary()))
 
-
-
   def aggregate(self, func, axis=0, *args, **kwargs):
     if axis is None:
       # Aggregate across all elements by first aggregating across columns,
@@ -1414,6 +1449,7 @@
             'aggregate',
             lambda df: df.agg(func, *args, **kwargs),
             [self._expr],
+            # TODO(BEAM-11839): Provide a reason for this Singleton
             requires_partition_by=partitionings.Singleton()))
     else:
       # In the general case, compute the aggregation of each column separately,
@@ -1499,12 +1535,15 @@
                 proxy=proxy))
 
     else:
+      reason = (f"Encountered corr(method={method!r}) which cannot be "
+                "parallelized. Only corr(method='pearson') is currently "
+                "parallelizable.")
       return frame_base.DeferredFrame.wrap(
           expressions.ComputedExpression(
               'corr',
               lambda df: df.corr(method=method, min_periods=min_periods),
               [self._expr],
-              requires_partition_by=partitionings.Singleton()))
+              requires_partition_by=partitionings.Singleton(reason=reason)))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
@@ -1653,8 +1692,12 @@
             'mode',
             lambda df: df.mode(*args, **kwargs),
             [self._expr],
-            #TODO(robertwb): Approximate?
-            requires_partition_by=partitionings.Singleton(),
+            #TODO(BEAM-12181): Can we add an approximate implementation?
+            requires_partition_by=partitionings.Singleton(reason=(
+                "mode(axis='index') cannot currently be parallelized. See "
+                "BEAM-12181 tracking the possble addition of an approximate, "
+                "parallelizable implementation of mode."
+            )),
             preserves_partition_by=partitionings.Singleton()))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
@@ -1662,8 +1705,12 @@
   @frame_base.maybe_inplace
   def dropna(self, axis, **kwargs):
     # TODO(robertwb): This is a common pattern. Generalize?
-    if axis == 1 or axis == 'columns':
-      requires_partition_by = partitionings.Singleton()
+    if axis in (1, 'columns'):
+      requires_partition_by = partitionings.Singleton(reason=(
+          "dropna(axis=1) cannot currently be parallelized. It requires "
+          "checking all values in each column for NaN values, to determine "
+          "if that column should be dropped."
+      ))
     else:
       requires_partition_by = partitionings.Arbitrary()
     return frame_base.DeferredFrame.wrap(
@@ -1913,8 +1960,11 @@
       requires_partition_by = partitionings.Arbitrary()
       preserves_partition_by = partitionings.Index()
     else:
-      # TODO: This could be implemented in a distributed fashion
-      requires_partition_by = partitionings.Singleton()
+      # TODO(BEAM-9547): This could be implemented in a distributed fashion,
+      # perhaps by deferring to a distributed drop_duplicates
+      requires_partition_by = partitionings.Singleton(reason=(
+         "nunique(axis='index') is not currently parallelizable."
+      ))
       preserves_partition_by = partitionings.Singleton()
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
@@ -1941,22 +1991,31 @@
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
   def quantile(self, q, axis, **kwargs):
-    if axis in (1, 'columns') and isinstance(q, list):
-      raise frame_base.WontImplementError(
-          "quantile(axis=columns) with multiple q values is not supported "
-          "because it transposes the input DataFrame. Note computing "
-          "an individual quantile across columns (e.g. "
-          f"df.quantile(q={q[0]!r}, axis={axis!r}) is supported.",
-          reason="non-deferred-columns")
+    if axis in (1, 'columns'):
+      if isinstance(q, list):
+        raise frame_base.WontImplementError(
+            "quantile(axis=columns) with multiple q values is not supported "
+            "because it transposes the input DataFrame. Note computing "
+            "an individual quantile across columns (e.g. "
+            f"df.quantile(q={q[0]!r}, axis={axis!r}) is supported.",
+            reason="non-deferred-columns")
+      else:
+        requires = partitionings.Arbitrary()
+    else: # axis='index'
+      # TODO(BEAM-12167): Provide an option for approximate distributed
+      # quantiles
+      requires = partitionings.Singleton(reason=(
+          "Computing quantiles across index cannot currently be parallelized. "
+          "See BEAM-12167 tracking the possible addition of an approximate, "
+          "parallelizable implementation of quantile."
+      ))
 
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'quantile',
             lambda df: df.quantile(q=q, axis=axis, **kwargs),
             [self._expr],
-            # TODO(BEAM-12167): Provide an option for approximate distributed
-            # quantiles
-            requires_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires,
             preserves_partition_by=partitionings.Singleton()))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
@@ -1978,8 +2037,15 @@
       preserves_partition_by = partitionings.Index()
 
     if kwargs.get('errors', None) == 'raise' and rename_index:
-      # Renaming index with checking requires global index.
-      requires_partition_by = partitionings.Singleton()
+      # TODO: We could do this in parallel by creating a ConstantExpression
+      # with a series created from the mapper dict. Then Index() partitioning
+      # would co-locate the necessary index values and we could raise
+      # individually within each partition. Execution time errors are
+      # discouraged anyway so probably not worth the effort.
+      requires_partition_by = partitionings.Singleton(reason=(
+          "rename(errors='raise', axis='index') requires collecting all "
+          "data on a single node in order to detect missing index values."
+      ))
     else:
       requires_partition_by = partitionings.Arbitrary()
 
@@ -2014,7 +2080,9 @@
     if limit is None:
       requires_partition_by = partitionings.Arbitrary()
     else:
-      requires_partition_by = partitionings.Singleton()
+      requires_partition_by = partitionings.Singleton(reason=(
+         f"replace(limit={limit!r}) cannot currently be parallelized, it "
+         "requires collecting all data on a single node."))
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'replace',
@@ -2032,8 +2100,11 @@
     if level is not None and not isinstance(level, (tuple, list)):
       level = [level]
     if level is None or len(level) == self._expr.proxy().index.nlevels:
-      # TODO: Could do distributed re-index with offsets.
-      requires_partition_by = partitionings.Singleton()
+      # TODO(BEAM-12182): Could do distributed re-index with offsets.
+      requires_partition_by = partitionings.Singleton(reason=(
+          "reset_index(level={level!r}) drops the entire index and creates a "
+          "new one, so it cannot currently be parallelized (BEAM-12182)."
+      ))
     else:
       requires_partition_by = partitionings.Arbitrary()
     return frame_base.DeferredFrame.wrap(
@@ -2070,20 +2141,37 @@
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
-  def shift(self, axis, **kwargs):
-    if 'freq' in kwargs:
-      raise frame_base.WontImplementError('data-dependent')
-    if axis == 1 or axis == 'columns':
-      requires_partition_by = partitionings.Arbitrary()
+  def shift(self, axis, freq, **kwargs):
+    if axis in (1, 'columns'):
+      preserves = partitionings.Arbitrary()
+      proxy = None
     else:
-      requires_partition_by = partitionings.Singleton()
+      if freq is None or 'fill_value' in kwargs:
+        fill_value = kwargs.get('fill_value', 'NOT SET')
+        raise frame_base.WontImplementError(
+            f"shift(axis={axis!r}) is only supported with freq defined, and "
+            f"fill_value undefined (got freq={freq!r},"
+            f"fill_value={fill_value!r}). Other configurations are sensitive "
+            "to the order of the data because they require populating shifted "
+            "rows with `fill_value`.",
+            reason="order-sensitive")
+      # proxy generation fails in pandas <1.2
+      # Seems due to https://github.com/pandas-dev/pandas/issues/14811,
+      # bug with shift on empty indexes.
+      # Fortunately the proxy should be identical to the input.
+      proxy = self._expr.proxy().copy()
+
+      # index is modified, so no partitioning is preserved.
+      preserves = partitionings.Singleton()
+
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'shift',
-            lambda df: df.shift(axis=axis, **kwargs),
+            lambda df: df.shift(axis=axis, freq=freq, **kwargs),
             [self._expr],
-            preserves_partition_by=partitionings.Singleton(),
-            requires_partition_by=requires_partition_by))
+            proxy=proxy,
+            preserves_partition_by=preserves,
+            requires_partition_by=partitionings.Arbitrary()))
 
   shape = property(frame_base.wont_implement_method(
       pd.DataFrame, 'shape', reason="non-deferred-result"))
@@ -2388,7 +2476,10 @@
             df.groupby(level=list(range(df.index.nlevels)), **groupby_kwargs),
             **kwargs),
         [pre_agg],
-        requires_partition_by=(partitionings.Singleton()
+        requires_partition_by=(partitionings.Singleton(reason=(
+            "Aggregations grouped by a categorical column are not currently "
+            "parallelizable (BEAM-11190)."
+        ))
                                if is_categorical_grouping
                                else partitionings.Index()),
         preserves_partition_by=partitionings.Arbitrary())
@@ -2416,7 +2507,10 @@
                        **groupby_kwargs),
             ), **kwargs),
         [self._ungrouped],
-        requires_partition_by=(partitionings.Singleton()
+        requires_partition_by=(partitionings.Singleton(reason=(
+            "Aggregations grouped by a categorical column are not currently "
+            "parallelizable (BEAM-11190)."
+        ))
                                if is_categorical_grouping
                                else partitionings.Index()),
         preserves_partition_by=partitionings.Arbitrary())
@@ -2633,7 +2727,10 @@
   def cat(self, others, join, **kwargs):
     if others is None:
       # Concatenate series into a single String
-      requires = partitionings.Singleton()
+      requires = partitionings.Singleton(reason=(
+          "cat(others=None) concatenates all data in a Series into a single "
+          "string, so it requires collecting all data on a single node."
+      ))
       func = lambda df: df.str.cat(join=join, **kwargs)
       args = [self._expr]
 
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index b692f08..1cf1dfb 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -45,12 +45,15 @@
 
 
 class DeferredFrameTest(unittest.TestCase):
-  def _run_error_test(self, func, *args):
+  def _run_error_test(
+      self, func, *args, construction_time=True, distributed=True):
     """Verify that func(*args) raises the same exception in pandas and in Beam.
 
-    Note that for Beam this only checks for exceptions that are raised during
-    expression generation (i.e. construction time). Execution time exceptions
-    are not helpful."""
+    Note that by default this only checks for exceptions that the Beam DataFrame
+    API raises during expression generation (i.e. construction time).
+    Exceptions raised while the pipeline is executing are less helpful, but
+    are sometimes unavoidable (e.g. data validation exceptions), to check for
+    these exceptions use construction_time=False."""
     deferred_args = _get_deferred_args(*args)
 
     # Get expected error
@@ -64,14 +67,29 @@
           f"returned:\n{expected}")
 
     # Get actual error
-    try:
-      _ = func(*deferred_args)._expr
-    except Exception as e:
-      actual = e
-    else:
-      raise AssertionError(
-          "Expected an error:\n{expected_error}\nbut Beam successfully "
-          "generated an expression.")
+    if construction_time:
+      try:
+        _ = func(*deferred_args)._expr
+      except Exception as e:
+        actual = e
+      else:
+        raise AssertionError(
+            f"Expected an error:\n{expected_error}\nbut Beam successfully "
+            f"generated an expression.")
+    else:  # not construction_time
+      # Check for an error raised during pipeline execution
+      expr = func(*deferred_args)._expr
+      session_type = (
+          expressions.PartitioningSession
+          if distributed else expressions.Session)
+      try:
+        result = session_type({}).evaluate(expr)
+      except Exception as e:
+        actual = e
+      else:
+        raise AssertionError(
+            f"Expected an error:\n{expected_error}\nbut Beam successfully "
+            f"Computed the result:\n{result}.")
 
     # Verify
     if (not isinstance(actual, type(expected_error)) or
@@ -99,8 +117,15 @@
     deferred_args = _get_deferred_args(*args)
     if nonparallel:
       # First run outside a nonparallel block to confirm this raises as expected
-      with self.assertRaises(expressions.NonParallelOperation):
-        _ = func(*deferred_args)
+      with self.assertRaises(expressions.NonParallelOperation) as raised:
+        func(*deferred_args)
+
+      if raised.exception.msg.startswith(
+          "Encountered non-parallelizable form of"):
+        raise AssertionError(
+            "Default NonParallelOperation raised, please specify a reason in "
+            "the Singleton() partitioning requirement for this operation."
+        ) from raised.exception
 
       # Re-run in an allow non parallel block to get an expression to verify
       with beam.dataframe.allow_non_parallel_operations():
@@ -722,13 +747,14 @@
             lambda x: (x.foo + x.bar).median()),
         df)
 
-  def test_quantile_axis_columns(self):
+  def test_quantile(self):
     df = pd.DataFrame(
         np.array([[1, 1], [2, 10], [3, 100], [4, 100]]), columns=['a', 'b'])
 
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(lambda df: df.quantile(0.1, axis='columns'), df)
+    self._run_test(lambda df: df.quantile(0.1), df, nonparallel=True)
+    self._run_test(lambda df: df.quantile([0.1, 0.9]), df, nonparallel=True)
 
+    self._run_test(lambda df: df.quantile(0.1, axis='columns'), df)
     with self.assertRaisesRegex(frame_base.WontImplementError,
                                 r"df\.quantile\(q=0\.1, axis='columns'\)"):
       self._run_test(lambda df: df.quantile([0.1, 0.5], axis='columns'), df)
@@ -742,6 +768,7 @@
         lambda df: df.groupby('foo', dropna=False).bar.count(), GROUPBY_DF)
 
   def test_dataframe_melt(self):
+
     df = pd.DataFrame({
         'A': {
             0: 'a', 1: 'b', 2: 'c'
@@ -784,6 +811,40 @@
             id_vars=[('A', 'D')], value_vars=[('B', 'E')], ignore_index=False),
         df)
 
+  def test_fillna_columns(self):
+    df = pd.DataFrame(
+        [[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], [np.nan, np.nan, np.nan, 5],
+         [np.nan, 3, np.nan, 4], [3, np.nan, np.nan, 4]],
+        columns=list('ABCD'))
+
+    self._run_test(lambda df: df.fillna(method='ffill', axis='columns'), df)
+    self._run_test(
+        lambda df: df.fillna(method='ffill', axis='columns', limit=1), df)
+    self._run_test(
+        lambda df: df.fillna(method='bfill', axis='columns', limit=1), df)
+
+    # Intended behavior is unclear here. See
+    # https://github.com/pandas-dev/pandas/issues/40989
+    # self._run_test(lambda df: df.fillna(axis='columns', value=100,
+    #                                     limit=2), df)
+
+  def test_append_verify_integrity(self):
+    df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
+    df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19))
+
+    self._run_error_test(
+        lambda s1,
+        s2: s1.append(s2, verify_integrity=True),
+        df1['A'],
+        df2['A'],
+        construction_time=False)
+    self._run_error_test(
+        lambda df1,
+        df2: df1.append(df2, verify_integrity=True),
+        df1,
+        df2,
+        construction_time=False)
+
 
 class AllowNonParallelTest(unittest.TestCase):
   def _use_non_parallel_operation(self):
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index f720112..fcf18fa 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -40,7 +40,10 @@
             'pandas.core.generic.NDFrame.first': ['*'],
             'pandas.core.generic.NDFrame.head': ['*'],
             'pandas.core.generic.NDFrame.last': ['*'],
-            'pandas.core.generic.NDFrame.shift': ['*'],
+            'pandas.core.generic.NDFrame.shift': [
+                'df.shift(periods=3)',
+                'df.shift(periods=3, fill_value=0)',
+            ],
             'pandas.core.generic.NDFrame.tail': ['*'],
             'pandas.core.generic.NDFrame.take': ['*'],
             'pandas.core.generic.NDFrame.values': ['*'],
@@ -189,8 +192,8 @@
             'pandas.core.frame.DataFrame.transpose': ['*'],
             'pandas.core.frame.DataFrame.shape': ['*'],
             'pandas.core.frame.DataFrame.shift': [
-                'df.shift(periods=3, freq="D")',
-                'df.shift(periods=3, freq="infer")'
+                'df.shift(periods=3)',
+                'df.shift(periods=3, fill_value=0)',
             ],
             'pandas.core.frame.DataFrame.unstack': ['*'],
             'pandas.core.frame.DataFrame.memory_usage': ['*'],
@@ -395,7 +398,10 @@
             ],
             'pandas.core.series.Series.pop': ['*'],
             'pandas.core.series.Series.searchsorted': ['*'],
-            'pandas.core.series.Series.shift': ['*'],
+            'pandas.core.series.Series.shift': [
+                'df.shift(periods=3)',
+                'df.shift(periods=3, fill_value=0)',
+            ],
             'pandas.core.series.Series.take': ['*'],
             'pandas.core.series.Series.to_dict': ['*'],
             'pandas.core.series.Series.unique': ['*'],
diff --git a/sdks/python/apache_beam/dataframe/partitionings.py b/sdks/python/apache_beam/dataframe/partitionings.py
index ef58023..afb71ba 100644
--- a/sdks/python/apache_beam/dataframe/partitionings.py
+++ b/sdks/python/apache_beam/dataframe/partitionings.py
@@ -151,6 +151,13 @@
 class Singleton(Partitioning):
   """A partitioning of all the data into a single partition.
   """
+  def __init__(self, reason=None):
+    self._reason = reason
+
+  @property
+  def reason(self):
+    return self._reason
+
   def __eq__(self, other):
     return type(self) == type(other)
 
diff --git a/sdks/python/apache_beam/dataframe/schemas.py b/sdks/python/apache_beam/dataframe/schemas.py
index cc30cec..ee3c2a3 100644
--- a/sdks/python/apache_beam/dataframe/schemas.py
+++ b/sdks/python/apache_beam/dataframe/schemas.py
@@ -281,7 +281,8 @@
     ctor = element_type_from_dataframe(proxy, include_indexes=include_indexes)
 
     return beam.ParDo(
-        _UnbatchWithIndex(ctor) if include_indexes else _UnbatchNoIndex(ctor))
+        _UnbatchWithIndex(ctor) if include_indexes else _UnbatchNoIndex(ctor)
+    ).with_output_types(ctor)
   elif isinstance(proxy, pd.Series):
     # Raise a TypeError if proxy has an unknown type
     output_type = _dtype_to_fieldtype(proxy.dtype)
diff --git a/sdks/python/apache_beam/dataframe/schemas_test.py b/sdks/python/apache_beam/dataframe/schemas_test.py
index 8b1159c..af30e25 100644
--- a/sdks/python/apache_beam/dataframe/schemas_test.py
+++ b/sdks/python/apache_beam/dataframe/schemas_test.py
@@ -36,6 +36,8 @@
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
+from apache_beam.typehints import typehints
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
 
 Simple = typing.NamedTuple(
     'Simple', [('name', unicode), ('id', int), ('height', float)])
@@ -65,41 +67,59 @@
 # dtype. For example:
 #   pd.Series([b'abc'], dtype=bytes).dtype != 'S'
 #   pd.Series([b'abc'], dtype=bytes).astype(bytes).dtype == 'S'
+# (test data, pandas_type, column_name, beam_type)
 COLUMNS = [
-    ([375, 24, 0, 10, 16], np.int32, 'i32'),
-    ([375, 24, 0, 10, 16], np.int64, 'i64'),
-    ([375, 24, None, 10, 16], pd.Int32Dtype(), 'i32_nullable'),
-    ([375, 24, None, 10, 16], pd.Int64Dtype(), 'i64_nullable'),
-    ([375., 24., None, 10., 16.], np.float64, 'f64'),
-    ([375., 24., None, 10., 16.], np.float32, 'f32'),
-    ([True, False, True, True, False], bool, 'bool'),
-    (['Falcon', 'Ostrich', None, 3.14, 0], object, 'any'),
-    ([True, False, True, None, False], pd.BooleanDtype(), 'bool_nullable'),
+    ([375, 24, 0, 10, 16], np.int32, 'i32', np.int32),
+    ([375, 24, 0, 10, 16], np.int64, 'i64', np.int64),
+    ([375, 24, None, 10, 16],
+     pd.Int32Dtype(),
+     'i32_nullable',
+     typing.Optional[np.int32]),
+    ([375, 24, None, 10, 16],
+     pd.Int64Dtype(),
+     'i64_nullable',
+     typing.Optional[np.int64]),
+    ([375., 24., None, 10., 16.],
+     np.float64,
+     'f64',
+     typing.Optional[np.float64]),
+    ([375., 24., None, 10., 16.],
+     np.float32,
+     'f32',
+     typing.Optional[np.float32]),
+    ([True, False, True, True, False], bool, 'bool', bool),
+    (['Falcon', 'Ostrich', None, 3.14, 0], object, 'any', typing.Any),
+    ([True, False, True, None, False],
+     pd.BooleanDtype(),
+     'bool_nullable',
+     typing.Optional[bool]),
     (['Falcon', 'Ostrich', None, 'Aardvark', 'Elephant'],
      pd.StringDtype(),
-     'strdtype'),
-]  # type: typing.List[typing.Tuple[typing.List[typing.Any], typing.Any, str]]
+     'strdtype',
+     typing.Optional[str]),
+]  # type: typing.List[typing.Tuple[typing.List[typing.Any], typing.Any, str, typing.Any]]
 
-NICE_TYPES_DF = pd.DataFrame(columns=[name for _, _, name in COLUMNS])
-for arr, dtype, name in COLUMNS:
+NICE_TYPES_DF = pd.DataFrame(columns=[name for _, _, name, _ in COLUMNS])
+for arr, dtype, name, _ in COLUMNS:
   NICE_TYPES_DF[name] = pd.Series(arr, dtype=dtype, name=name).astype(dtype)
 
 NICE_TYPES_PROXY = NICE_TYPES_DF[:0]
 
-SERIES_TESTS = [(pd.Series(arr, dtype=dtype, name=name), arr) for arr,
-                dtype,
-                name in COLUMNS]
+SERIES_TESTS = [(pd.Series(arr, dtype=dtype, name=name), arr, beam_type)
+                for (arr, dtype, name, beam_type) in COLUMNS]
 
 _TEST_ARRAYS = [
-    arr for arr, _, _ in COLUMNS
+    arr for (arr, _, _, _) in COLUMNS
 ]  # type: typing.List[typing.List[typing.Any]]
 DF_RESULT = list(zip(*_TEST_ARRAYS))
-INDEX_DF_TESTS = [
-    (NICE_TYPES_DF.set_index([name for _, _, name in COLUMNS[:i]]), DF_RESULT)
-    for i in range(1, len(COLUMNS) + 1)
-]
+BEAM_SCHEMA = typing.NamedTuple(  # type: ignore
+    'BEAM_SCHEMA', [(name, beam_type) for _, _, name, beam_type in COLUMNS])
+INDEX_DF_TESTS = [(
+    NICE_TYPES_DF.set_index([name for _, _, name, _ in COLUMNS[:i]]),
+    DF_RESULT,
+    BEAM_SCHEMA) for i in range(1, len(COLUMNS) + 1)]
 
-NOINDEX_DF_TESTS = [(NICE_TYPES_DF, DF_RESULT)]
+NOINDEX_DF_TESTS = [(NICE_TYPES_DF, DF_RESULT, BEAM_SCHEMA)]
 
 PD_VERSION = tuple(int(n) for n in pd.__version__.split('.'))
 
@@ -203,8 +223,18 @@
                 proxy=schemas.generate_proxy(Animal)))
         assert_that(res, equal_to([('Falcon', 375.), ('Parrot', 25.)]))
 
+  def assert_typehints_equal(self, left, right):
+    left = typehints.normalize(left)
+    right = typehints.normalize(right)
+
+    if match_is_named_tuple(left):
+      self.assertTrue(match_is_named_tuple(right))
+      self.assertEqual(left.__annotations__, right.__annotations__)
+    else:
+      self.assertEqual(left, right)
+
   @parameterized.expand(SERIES_TESTS + NOINDEX_DF_TESTS)
-  def test_unbatch_no_index(self, df_or_series, rows):
+  def test_unbatch_no_index(self, df_or_series, rows, beam_type):
     proxy = df_or_series[:0]
 
     with TestPipeline() as p:
@@ -212,10 +242,15 @@
           p | beam.Create([df_or_series[::2], df_or_series[1::2]])
           | schemas.UnbatchPandas(proxy))
 
+      # Verify that the unbatched PCollection has the expected typehint
+      # TODO(BEAM-8538): typehints should support NamedTuple so we can use
+      # typehints.is_consistent_with here instead
+      self.assert_typehints_equal(res.element_type, beam_type)
+
       assert_that(res, equal_to(rows))
 
   @parameterized.expand(SERIES_TESTS + INDEX_DF_TESTS)
-  def test_unbatch_with_index(self, df_or_series, rows):
+  def test_unbatch_with_index(self, df_or_series, rows, _):
     proxy = df_or_series[:0]
 
     with TestPipeline() as p:
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai.py
deleted file mode 100644
index 6e50cd2..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai.py
+++ /dev/null
@@ -1,585 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A connector for sending API requests to the GCP Recommendations AI
-API (https://cloud.google.com/recommendations).
-"""
-
-from __future__ import absolute_import
-
-from typing import Sequence
-from typing import Tuple
-
-from google.api_core.retry import Retry
-
-from apache_beam import pvalue
-from apache_beam.metrics import Metrics
-from apache_beam.options.pipeline_options import GoogleCloudOptions
-from apache_beam.transforms import DoFn
-from apache_beam.transforms import ParDo
-from apache_beam.transforms import PTransform
-from apache_beam.transforms.util import GroupIntoBatches
-from cachetools.func import ttl_cache
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-except ImportError:
-  raise ImportError(
-      'Google Cloud Recommendation AI not supported for this execution '
-      'environment (could not import google.cloud.recommendationengine).')
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-__all__ = [
-    'CreateCatalogItem',
-    'WriteUserEvent',
-    'ImportCatalogItems',
-    'ImportUserEvents',
-    'PredictUserEvent'
-]
-
-FAILED_CATALOG_ITEMS = "failed_catalog_items"
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_prediction_client():
-  """Returns a Recommendation AI - Prediction Service client."""
-  _client = recommendationengine.PredictionServiceClient()
-  return _client
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_catalog_client():
-  """Returns a Recommendation AI - Catalog Service client."""
-  _client = recommendationengine.CatalogServiceClient()
-  return _client
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_user_event_client():
-  """Returns a Recommendation AI - UserEvent Service client."""
-  _client = recommendationengine.UserEventServiceClient()
-  return _client
-
-
-class CreateCatalogItem(PTransform):
-  """Creates catalogitem information.
-    The ``PTranform`` returns a PCollectionTuple with a PCollections of
-    successfully and failed created CatalogItems.
-
-    Example usage::
-
-      pipeline | CreateCatalogItem(
-        project='example-gcp-project',
-        catalog_name='my-catalog')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog"):
-    """Initializes a :class:`CreateCatalogItem` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          """GCP project name needs to be specified in "project" pipeline
-            option""")
-    return pcoll | ParDo(
-        _CreateCatalogItemFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name))
-
-
-class _CreateCatalogItemFn(DoFn):
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/{catalog_name}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_catalog_client()
-
-  def process(self, element):
-    catalog_item = recommendationengine.CatalogItem(element)
-    request = recommendationengine.CreateCatalogItemRequest(
-        parent=self.parent, catalog_item=catalog_item)
-
-    try:
-      created_catalog_item = self._client.create_catalog_item(
-          request=request,
-          retry=self.retry,
-          timeout=self.timeout,
-          metadata=self.metadata)
-
-      self.counter.inc()
-      yield recommendationengine.CatalogItem.to_dict(created_catalog_item)
-    except Exception:
-      yield pvalue.TaggedOutput(
-          FAILED_CATALOG_ITEMS,
-          recommendationengine.CatalogItem.to_dict(catalog_item))
-
-
-class ImportCatalogItems(PTransform):
-  """Imports catalogitems in bulk.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed imported CatalogItems.
-
-    Example usage::
-
-      pipeline
-      | ImportCatalogItems(
-          project='example-gcp-project',
-          catalog_name='my-catalog')
-    """
-  def __init__(
-      self,
-      max_batch_size: int = 5000,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog"):
-    """Initializes a :class:`ImportCatalogItems` transform
-
-        Args:
-            batch_size (int): Required. Maximum number of catalogitems per
-              request.
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-        """
-    self.max_batch_size = max_batch_size
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return (
-        pcoll | GroupIntoBatches.WithShardedKey(self.max_batch_size) | ParDo(
-            _ImportCatalogItemsFn(
-                self.project,
-                self.retry,
-                self.timeout,
-                self.metadata,
-                self.catalog_name)))
-
-
-class _ImportCatalogItemsFn(DoFn):
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/{catalog_name}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self.client = get_recommendation_catalog_client()
-
-  def process(self, element):
-    catalog_items = [recommendationengine.CatalogItem(e) for e in element[1]]
-    catalog_inline_source = recommendationengine.CatalogInlineSource(
-        {"catalog_items": catalog_items})
-    input_config = recommendationengine.InputConfig(
-        catalog_inline_source=catalog_inline_source)
-
-    request = recommendationengine.ImportCatalogItemsRequest(
-        parent=self.parent, input_config=input_config)
-
-    try:
-      operation = self._client.import_catalog_items(
-          request=request,
-          retry=self.retry,
-          timeout=self.timeout,
-          metadata=self.metadata)
-      self.counter.inc(len(catalog_items))
-      yield operation.result()
-    except Exception:
-      yield pvalue.TaggedOutput(FAILED_CATALOG_ITEMS, catalog_items)
-
-
-class WriteUserEvent(PTransform):
-  """Write user event information.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed written UserEvents.
-
-    Example usage::
-
-      pipeline
-      | WriteUserEvent(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store"):
-    """Initializes a :class:`WriteUserEvent` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return pcoll | ParDo(
-        _WriteUserEventFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name,
-            self.event_store))
-
-
-class _WriteUserEventFn(DoFn):
-  FAILED_USER_EVENTS = "failed_user_events"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/"\
-                  f"{catalog_name}/eventStores/{event_store}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_user_event_client()
-
-  def process(self, element):
-    user_event = recommendationengine.UserEvent(element)
-    request = recommendationengine.WriteUserEventRequest(
-        parent=self.parent, user_event=user_event)
-
-    try:
-      created_user_event = self._client.write_user_event(request)
-      self.counter.inc()
-      yield recommendationengine.UserEvent.to_dict(created_user_event)
-    except Exception:
-      yield pvalue.TaggedOutput(
-          self.FAILED_USER_EVENTS,
-          recommendationengine.UserEvent.to_dict(user_event))
-
-
-class ImportUserEvents(PTransform):
-  """Imports userevents in bulk.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed imported UserEvents.
-
-    Example usage::
-
-      pipeline
-      | ImportUserEvents(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store')
-    """
-  def __init__(
-      self,
-      max_batch_size: int = 5000,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store"):
-    """Initializes a :class:`WriteUserEvent` transform.
-
-        Args:
-            batch_size (int): Required. Maximum number of catalogitems
-              per request.
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-        """
-    self.max_batch_size = max_batch_size
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return (
-        pcoll | GroupIntoBatches.WithShardedKey(self.max_batch_size) | ParDo(
-            _ImportUserEventsFn(
-                self.project,
-                self.retry,
-                self.timeout,
-                self.metadata,
-                self.catalog_name,
-                self.event_store)))
-
-
-class _ImportUserEventsFn(DoFn):
-  FAILED_USER_EVENTS = "failed_user_events"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/"\
-                  f"{catalog_name}/eventStores/{event_store}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self.client = get_recommendation_user_event_client()
-
-  def process(self, element):
-
-    user_events = [recommendationengine.UserEvent(e) for e in element[1]]
-    user_event_inline_source = recommendationengine.UserEventInlineSource(
-        {"user_events": user_events})
-    input_config = recommendationengine.InputConfig(
-        user_event_inline_source=user_event_inline_source)
-
-    request = recommendationengine.ImportUserEventsRequest(
-        parent=self.parent, input_config=input_config)
-
-    try:
-      operation = self._client.write_user_event(request)
-      self.counter.inc(len(user_events))
-      yield recommendationengine.PredictResponse.to_dict(operation.result())
-    except Exception:
-      yield pvalue.TaggedOutput(self.FAILED_USER_EVENTS, user_events)
-
-
-class PredictUserEvent(PTransform):
-  """Make a recommendation prediction.
-    The `PTransform` returns a PCollection
-
-    Example usage::
-
-      pipeline
-      | PredictUserEvent(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store',
-          placement_id='recently_viewed_default')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store",
-      placement_id: str = None):
-    """Initializes a :class:`PredictUserEvent` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-            placement_id (str): Required. ID of the recommendation engine
-              placement. This id is used to identify the set of models that
-              will be used to make the prediction.
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.placement_id = placement_id
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-    if placement_id is None:
-      raise ValueError('placement_id must be specified')
-    else:
-      self.placement_id = placement_id
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return pcoll | ParDo(
-        _PredictUserEventFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name,
-            self.event_store,
-            self.placement_id))
-
-
-class _PredictUserEventFn(DoFn):
-  FAILED_PREDICTIONS = "failed_predictions"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None,
-      placement_id=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.name = f"projects/{project}/locations/global/catalogs/"\
-                f"{catalog_name}/eventStores/{event_store}/placements/"\
-                f"{placement_id}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_prediction_client()
-
-  def process(self, element):
-    user_event = recommendationengine.UserEvent(element)
-    request = recommendationengine.PredictRequest(
-        name=self.name, user_event=user_event)
-
-    try:
-      prediction = self._client.predict(request)
-      self.counter.inc()
-      yield [
-          recommendationengine.PredictResponse.to_dict(p)
-          for p in prediction.pages
-      ]
-    except Exception:
-      yield pvalue.TaggedOutput(self.FAILED_PREDICTIONS, user_event)
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py
deleted file mode 100644
index 2f688d9..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py
+++ /dev/null
@@ -1,207 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for Recommendations AI transforms."""
-
-from __future__ import absolute_import
-
-import unittest
-
-import mock
-
-import apache_beam as beam
-from apache_beam.metrics import MetricsFilter
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-  from apache_beam.ml.gcp import recommendations_ai
-except ImportError:
-  recommendationengine = None
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAICatalogItemTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.create_catalog_item.return_value = (
-        recommendationengine.CatalogItem())
-    self.m2 = mock.Mock()
-    self.m2.result.return_value = None
-    self._mock_client.import_catalog_items.return_value = self.m2
-
-    self._catalog_item = {
-        "id": "12345",
-        "title": "Sample laptop",
-        "description": "Indisputably the most fantastic laptop ever created.",
-        "language_code": "en",
-        "category_hierarchies": [{
-            "categories": ["Electronic", "Computers"]
-        }]
-    }
-
-  def test_CreateCatalogItem(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_catalog_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._catalog_item])
-          | "Create CatalogItem" >>
-          recommendations_ai.CreateCatalogItem(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-  def test_ImportCatalogItems(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_catalog_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([
-              (self._catalog_item["id"], self._catalog_item),
-              (self._catalog_item["id"], self._catalog_item)
-          ]) | "Create CatalogItems" >>
-          recommendations_ai.ImportCatalogItems(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAIUserEventTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.write_user_event.return_value = (
-        recommendationengine.UserEvent())
-    self.m2 = mock.Mock()
-    self.m2.result.return_value = None
-    self._mock_client.import_user_events.return_value = self.m2
-
-    self._user_event = {
-        "event_type": "page-visit", "user_info": {
-            "visitor_id": "1"
-        }
-    }
-
-  def test_CreateUserEvent(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_user_event_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._user_event])
-          | "Create UserEvent" >>
-          recommendations_ai.WriteUserEvent(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-  def test_ImportUserEvents(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_user_event_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([
-              (self._user_event["user_info"]["visitor_id"], self._user_event),
-              (self._user_event["user_info"]["visitor_id"], self._user_event)
-          ]) | "Create UserEvents" >>
-          recommendations_ai.ImportUserEvents(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAIPredictTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.predict.return_value = [
-        recommendationengine.PredictResponse()
-    ]
-
-    self._user_event = {
-        "event_type": "page-visit", "user_info": {
-            "visitor_id": "1"
-        }
-    }
-
-  def test_Predict(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_prediction_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._user_event])
-          | "Prediction UserEvents" >> recommendations_ai.PredictUserEvent(
-              project="test", placement_id="recently_viewed_default"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
deleted file mode 100644
index 19e6b9e..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Integration tests for Recommendations AI transforms."""
-
-from __future__ import absolute_import
-
-import random
-import unittest
-
-from nose.plugins.attrib import attr
-
-import apache_beam as beam
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-from apache_beam.testing.util import is_not_empty
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-  from apache_beam.ml.gcp import recommendations_ai
-except ImportError:
-  recommendationengine = None
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-
-def extract_id(response):
-  yield response["id"]
-
-
-def extract_event_type(response):
-  yield response["event_type"]
-
-
-def extract_prediction(response):
-  yield response[0]["results"]
-
-
-@attr('IT')
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationAIIT(unittest.TestCase):
-  def test_create_catalog_item(self):
-
-    CATALOG_ITEM = {
-        "id": str(int(random.randrange(100000))),
-        "title": "Sample laptop",
-        "description": "Indisputably the most fantastic laptop ever created.",
-        "language_code": "en",
-        "category_hierarchies": [{
-            "categories": ["Electronic", "Computers"]
-        }]
-    }
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([CATALOG_ITEM])
-          | 'Create CatalogItem' >>
-          recommendations_ai.CreateCatalogItem(project=p.get_option('project'))
-          | beam.ParDo(extract_id) | beam.combiners.ToList())
-
-      assert_that(output, equal_to([[CATALOG_ITEM["id"]]]))
-
-  def test_create_user_event(self):
-    USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}}
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([USER_EVENT]) | 'Create UserEvent' >>
-          recommendations_ai.WriteUserEvent(project=p.get_option('project'))
-          | beam.ParDo(extract_event_type) | beam.combiners.ToList())
-
-      assert_that(output, equal_to([[USER_EVENT["event_type"]]]))
-
-  def test_create_predict(self):
-    USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}}
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([USER_EVENT])
-          | 'Predict UserEvent' >> recommendations_ai.PredictUserEvent(
-              project=p.get_option('project'),
-              placement_id="recently_viewed_default")
-          | beam.ParDo(extract_prediction))
-
-      assert_that(output, is_not_empty())
-
-
-if __name__ == '__main__':
-  print(recommendationengine.CatalogItem.__module__)
-  unittest.main()
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index c6d816a..e1f6635 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -123,6 +123,11 @@
 
   try:
     _load_main_session(semi_persistent_directory)
+  except CorruptMainSessionException:
+    exception_details = traceback.format_exc()
+    _LOGGER.error(
+        'Could not load main session: %s', exception_details, exc_info=True)
+    raise
   except Exception:  # pylint: disable=broad-except
     exception_details = traceback.format_exc()
     _LOGGER.error(
@@ -245,12 +250,29 @@
   return 0
 
 
+class CorruptMainSessionException(Exception):
+  """
+  Used to crash this worker if a main session file was provided but
+  is not valid.
+  """
+  pass
+
+
 def _load_main_session(semi_persistent_directory):
   """Loads a pickled main session from the path specified."""
   if semi_persistent_directory:
     session_file = os.path.join(
         semi_persistent_directory, 'staged', names.PICKLED_MAIN_SESSION_FILE)
     if os.path.isfile(session_file):
+      # If the expected session file is present but empty, it's likely that
+      # the user code run by this worker will likely crash at runtime.
+      # This can happen if the worker fails to download the main session.
+      # Raise a fatal error and crash this worker, forcing a restart.
+      if os.path.getsize(session_file) == 0:
+        raise CorruptMainSessionException(
+            'Session file found, but empty: %s. Functions defined in __main__ '
+            '(interactive session) will almost certainly fail.' %
+            (session_file, ))
       pickler.load_session(session_file)
     else:
       _LOGGER.warning(
diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py
index 2b738bf..d77727e 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py
@@ -107,7 +107,7 @@
   return getattr(user_type, '__origin__', None) is expected_origin
 
 
-def _match_is_named_tuple(user_type):
+def match_is_named_tuple(user_type):
   return (
       _safe_issubclass(user_type, typing.Tuple) and
       hasattr(user_type, '_field_types'))
@@ -234,7 +234,7 @@
       # We just convert it to Any for now.
       # This MUST appear before the entry for the normal Tuple.
       _TypeMapEntry(
-          match=_match_is_named_tuple, arity=0, beam_type=typehints.Any),
+          match=match_is_named_tuple, arity=0, beam_type=typehints.Any),
       _TypeMapEntry(
           match=_match_issubclass(typing.Tuple),
           arity=-1,
diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py
index 6a299bd..5daf68a 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -70,10 +70,10 @@
 from apache_beam.typehints import row_type
 from apache_beam.typehints.native_type_compatibility import _get_args
 from apache_beam.typehints.native_type_compatibility import _match_is_exactly_mapping
-from apache_beam.typehints.native_type_compatibility import _match_is_named_tuple
 from apache_beam.typehints.native_type_compatibility import _match_is_optional
 from apache_beam.typehints.native_type_compatibility import _safe_issubclass
 from apache_beam.typehints.native_type_compatibility import extract_optional_type
+from apache_beam.typehints.native_type_compatibility import match_is_named_tuple
 from apache_beam.utils import proto_utils
 from apache_beam.utils.timestamp import Timestamp
 
@@ -148,7 +148,7 @@
 
 
 def typing_to_runner_api(type_):
-  if _match_is_named_tuple(type_):
+  if match_is_named_tuple(type_):
     schema = None
     if hasattr(type_, _BEAM_SCHEMA_ID):
       schema = SCHEMA_REGISTRY.get_schema_by_id(getattr(type_, _BEAM_SCHEMA_ID))
@@ -287,7 +287,7 @@
     # TODO(BEAM-10722): Make sure beam.Row generated schemas are registered and
     # de-duped
     return named_fields_to_schema(element_type._fields)
-  elif _match_is_named_tuple(element_type):
+  elif match_is_named_tuple(element_type):
     return named_tuple_to_schema(element_type)
   else:
     raise TypeError(
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 7117a55..775f321 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -200,7 +200,6 @@
     'google-cloud-language>=1.3.0,<2',
     'google-cloud-videointelligence>=1.8.0,<2',
     'google-cloud-vision>=0.38.0,<2',
-    'google-cloud-recommendations-ai>=0.1.0,<=0.2.0'
 ]
 
 INTERACTIVE_BEAM = [
diff --git a/website/www/site/assets/scss/_global.sass b/website/www/site/assets/scss/_global.sass
index 773b404..f7f7b5a 100644
--- a/website/www/site/assets/scss/_global.sass
+++ b/website/www/site/assets/scss/_global.sass
@@ -115,7 +115,7 @@
   line-height: 1.63
   letter-spacing: 0.43px
   padding: 24px
-  max-width: 848px
+  max-width: 100%
   position: relative
 
   pre
@@ -133,7 +133,7 @@
         cursor: pointer
 
 .snippet
-  max-width: 848px
+  max-width: 100%
   margin-bottom: 40px
   .git-link
     float: right
diff --git a/website/www/site/content/en/documentation/_index.md b/website/www/site/content/en/documentation/_index.md
index 36812d5..80d111b 100644
--- a/website/www/site/content/en/documentation/_index.md
+++ b/website/www/site/content/en/documentation/_index.md
@@ -30,6 +30,7 @@
 * Read the [Programming Guide](/documentation/programming-guide/), which introduces all the key Beam concepts.
 * Learn about Beam's [execution model](/documentation/runtime/model) to better understand how pipelines execute.
 * Visit [Learning Resources](/documentation/resources/learning-resources) for some of our favorite articles and talks about Beam.
+* Visit the [glossary](/documentation/glossary) to learn the terminology of the Beam programming model.
 
 ## Pipeline Fundamentals
 
diff --git a/website/www/site/content/en/documentation/dsls/sql/calcite/query-syntax.md b/website/www/site/content/en/documentation/dsls/sql/calcite/query-syntax.md
index 37a53f6..782da8a 100644
--- a/website/www/site/content/en/documentation/dsls/sql/calcite/query-syntax.md
+++ b/website/www/site/content/en/documentation/dsls/sql/calcite/query-syntax.md
@@ -52,7 +52,7 @@
             | expression [ [ AS ] alias ] } [, ...]
         [ FROM from_item  [, ...] ]
         [ WHERE bool_expression ]
-        [ GROUP BY { expression [, ...] | ROLLUP ( expression [, ...] ) } ]
+        [ GROUP BY { expression [, ...] } ]
         [ HAVING bool_expression ]
 
     set_op:
@@ -454,7 +454,7 @@
 
 ### Syntax {#syntax_3}
 
-    GROUP BY { expression [, ...] | ROLLUP ( expression [, ...] ) }
+    GROUP BY { expression [, ...] }
 
 The `GROUP BY` clause groups together rows in a table with non-distinct values
 for the `expression` in the `GROUP BY` clause. For multiple rows in the source
diff --git a/website/www/site/content/en/documentation/glossary.md b/website/www/site/content/en/documentation/glossary.md
new file mode 100644
index 0000000..3993f53
--- /dev/null
+++ b/website/www/site/content/en/documentation/glossary.md
@@ -0,0 +1,464 @@
+---
+title: "Beam glossary"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Apache Beam glossary
+
+## Aggregation
+
+A transform pattern for computing a value from multiple input elements. Aggregation is similar to the reduce operation in the [MapReduce](https://en.wikipedia.org/wiki/MapReduce) model. Aggregation transforms include Count (computes the count of all elements in the aggregation), Max (computes the maximum element in the aggregation), and Sum (computes the sum of all elements in the aggregation).
+
+For a complete list of aggregation transforms, see:
+
+* [Java Transform catalog](/documentation/transforms/java/overview/#aggregation)
+* [Python Transform catalog](/documentation/transforms/python/overview/#aggregation)
+
+## Apply
+
+A method for invoking a transform on a PCollection. Each transform in the Beam SDKs has a generic `apply` method (or pipe operator `|`). Invoking multiple Beam transforms is similar to method chaining, but with a difference: You apply the transform to the input PCollection, passing the transform itself as an argument, and the operation returns the output PCollection. Because of Beam’s deferred execution model, applying a transform does not immediately execute that transform.
+
+To learn more, see:
+
+* [Applying transforms](/documentation/programming-guide/#applying-transforms)
+
+## Batch processing
+
+A data processing paradigm for working with finite, or bounded, datasets. A bounded PCollection represents a dataset of a known, fixed size. Reading from a batch data source, such as a file or a database, creates a bounded PCollection. A batch processing job eventually ends, in contrast to a streaming job, which runs until cancelled.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+
+## Bounded data
+
+A dataset of a known, fixed size. A PCollection can be bounded or unbounded, depending on the source of the data that it represents. Reading from a batch data source, such as a file or a database, creates a bounded PCollection. Beam also supports reading a bounded amount of data from an unbounded source.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+
+## Bundle
+
+The processing unit for elements in a PCollection. Instead of processing all elements in a PCollection simultaneously, Beam processes the elements in bundles. The runner handles the division of the collection into bundles, and in doing so it may optimize the bundle size for the use case. For example, a streaming runner might process smaller bundles than a batch runner.
+
+To learn more, see:
+
+* [Bundling and persistence](/documentation/runtime/model/#bundling-and-persistence)
+
+## Coder
+
+A component that describes how the elements of a PCollection can be encoded and decoded. To support distributed processing and cross-language portability, Beam needs to be able to encode each element of a PCollection as bytes. The Beam SDKs provide built-in coders for common types and language-specific mechanisms for specifying the encoding of a PCollection.
+
+To learn more, see:
+
+* [Data encoding and type safety](/documentation/programming-guide/#data-encoding-and-type-safety)
+
+## CoGroupByKey
+
+A PTransform that takes two or more PCollections and aggregates the elements by key. In effect, CoGroupByKey performs a relational join of two or more key/value PCollections that have the same key type. While GroupByKey performs this operation over a single input collection, CoGroupByKey operates over multiple input collections.
+
+To learn more, see:
+
+* [CoGroupByKey](/documentation/programming-guide/#cogroupbykey)
+* [CoGroupByKey (Java)](/documentation/transforms/java/aggregation/cogroupbykey/)
+* [CoGroupByKey (Python)](/documentation/transforms/python/aggregation/cogroupbykey/)
+
+## Collection
+
+See [PCollection](/documentation/glossary/#pcollection).
+
+## Combine
+
+A PTransform for combining all elements of a PCollection or all values associated with a key. When you apply a Combine transform, you have to provide a user-defined function (UDF) that contains the logic for combining the elements or values. The combining function should be [commutative](https://en.wikipedia.org/wiki/Commutative_property) and [associative](https://en.wikipedia.org/wiki/Associative_property), because the function is not necessarily invoked exactly once on all values with a given key.
+
+To learn more, see:
+
+* [Combine](/documentation/programming-guide/#combine)
+* [Combine (Java)](/documentation/transforms/java/aggregation/combine/)
+* [CombineGlobally (Python)](/documentation/transforms/python/aggregation/combineglobally/)
+* [CombinePerKey (Python)](/documentation/transforms/python/aggregation/combineperkey/)
+* [CombineValues (Python)](/documentation/transforms/python/aggregation/combinevalues/)
+
+## Composite transform
+
+A PTransform that expands into many PTransforms. Composite transforms have a nested structure, in which a complex transform applies one or more simpler transforms. These simpler transforms could be existing Beam operations like ParDo, Combine, or GroupByKey, or they could be other composite transforms. Nesting multiple transforms inside a single composite transform can make your pipeline more modular and easier to understand.
+
+To learn more, see:
+
+* [Composite transforms](/documentation/programming-guide/#composite-transforms)
+
+## Counter (metric)
+
+A metric that reports a single long value and can be incremented. In the Beam model, metrics provide insight into the state of a pipeline, potentially while the pipeline is running.
+
+To learn more, see:
+
+* [Types of metrics](/documentation/programming-guide/#types-of-metrics)
+
+## Cross-language transforms
+
+Transforms that can be shared across Beam SDKs. With cross-language transforms, you can use transforms written in any supported SDK language (currently, Java and Python) in a pipeline written in a different SDK language. For example, you could use the Apache Kafka connector from the Java SDK in a Python streaming pipeline. Cross-language transforms make it possible to provide new functionality simultaneously in different SDKs.
+
+To learn more, see:
+
+* [Multi-language pipelines](/documentation/programming-guide/#mulit-language-pipelines)
+
+## Deferred execution
+
+A feature of the Beam execution model. Beam operations are deferred, meaning that the result of a given operation may not be available for control flow. Deferred execution allows the Beam API to support parallel processing of data.
+
+## Distribution (metric)
+
+A metric that reports information about the distribution of reported values. In the Beam model, metrics provide insight into the state of a pipeline, potentially while the pipeline is running.
+
+To learn more, see:
+
+* [Types of metrics](/documentation/programming-guide/#types-of-metrics)
+
+## DoFn
+
+A function object used by ParDo (or some other transform) to process the elements of a PCollection. A DoFn is a user-defined function, meaning that it contains custom code that defines a data processing task in your pipeline. The Beam system invokes a DoFn one or more times to process some arbitrary bundle of elements, but Beam doesn’t guarantee an exact number of invocations.
+
+To learn more, see:
+
+* [ParDo](/documentation/programming-guide/#pardo)
+
+## Driver
+
+A program that defines your pipeline, including all of the inputs, transforms, and outputs. To use Beam, you need to create a driver program using classes from one of the Beam SDKs. The driver program creates a pipeline and specifies the execution options that tell the pipeline where and how to run. These options include the runner, which determines what backend your pipeline will run on.
+
+To learn more, see:
+
+* [Overview](/documentation/programming-guide/#overview)
+
+## Element
+
+The unit of data in a PCollection. Elements in a PCollection can be of any type, but they must all have the same type. This allows parallel computations to operate uniformly across the entire collection. Some element types have a structure that can be introspected (for example, JSON, Protocol Buffer, Avro, and database records).
+
+To learn more, see:
+
+* [PCollection characteristics](/documentation/programming-guide/#pcollection-characteristics)
+
+## Element-wise
+
+A type of transform that independently processes each element in an input PCollection. Element-wise is similar to the map operation in the [MapReduce](https://en.wikipedia.org/wiki/MapReduce) model. An element-wise transform might output 0, 1, or multiple values for each input element. This is in contrast to aggregation transforms, which compute a single value from multiple input elements. Element-wise operations include Filter, FlatMap, and ParDo.
+
+For a complete list of element-wise transforms, see:
+
+* [Java Transform catalog](/documentation/transforms/java/overview/#element-wise)
+* [Python Transform catalog](/documentation/transforms/python/overview/#element-wise)
+
+## Engine
+
+A data-processing system, such as Dataflow, Spark, or Flink. A Beam runner for an engine executes a Beam pipeline on that engine.
+
+## Event time
+
+The time a data event occurs, determined by a timestamp on an element. This is in contrast to processing time, which is when an element is processed in a pipeline. An event could be, for example, a user interaction or a write to an error log. There’s no guarantee that events will appear in a pipeline in order of event time.
+
+To learn more, see:
+
+* [Watermarks and late data](/documentation/programming-guide/#watermarks-and-late-data)
+* [Triggers](/documentation/programming-guide/#triggers)
+
+## Expansion Service
+
+A service that enables a pipeline to apply (expand) cross-language transforms defined in other SDKs. For example, by connecting to a Java expansion service, the Python SDK can apply transforms implemented in Java. Currently SDKs define expansion services as local processes, but in the future Beam may support long-running expansion services. The development of expansion services is part of the ongoing effort to support multi-language pipelines.
+
+## Flatten
+One of the core PTransforms. Flatten merges multiple PCollections into a single logical PCollection.
+
+To learn more, see:
+
+* [Flatten](/documentation/programming-guide/#flatten)
+* [Flatten (Java)](/documentation/transforms/java/other/flatten/)
+* [Flatten (Python)](/documentation/transforms/python/other/flatten/)
+
+## Fusion
+
+An optimization that Beam runners can apply before running a pipeline. When one transform outputs a PCollection that’s consumed by another transform, or when two or more transforms take the same PCollection as input, a runner may be able to fuse the transforms together into a single processing unit (a *stage* in Dataflow). Fusion can make pipeline execution more efficient by preventing I/O operations.
+
+## Gauge (metric)
+
+A metric that reports the latest value out of reported values. In the Beam model, metrics provide insight into the state of a pipeline, potentially while the pipeline is running. Because metrics are collected from many workers, the gauge value may not be the absolute last value, but it will be one of the latest values produced by one of the workers.
+
+To learn more, see:
+
+* [Types of metrics](/documentation/programming-guide/#types-of-metrics)
+
+## GroupByKey
+
+A PTransform for processing collections of key/value pairs. GroupByKey is a parallel reduction operation, similar to the shuffle of a map/shuffle/reduce algorithm. The input to GroupByKey is a collection of key/value pairs in which multiple pairs have the same key but different values (i.e. a multimap). You can use GroupByKey to collect all of the values associated with each unique key.
+
+To learn more, see:
+
+* [GroupByKey](/documentation/programming-guide/#groupbykey)
+* [GroupByKey (Java)](/documentation/transforms/java/aggregation/groupbykey/)
+* [GroupByKey (Python)](/documentation/transforms/python/aggregation/groupbykey/)
+
+## I/O connector
+
+A set of PTransforms for working with external data storage systems. When you create a pipeline, you often need to read from or write to external data systems such as files or databases. Beam provides read and write transforms for a number of common data storage types.
+
+To learn more, see:
+
+* [Pipeline I/O](/documentation/programming-guide/#pipeline-io)
+* [Built-in I/O Transforms](/documentation/io/built-in/)
+
+## Map
+
+An element-wise PTransform that applies a user-defined function (UDF) to each element in a PCollection. Using Map, you can transform each individual element, but you can't change the number of elements.
+
+To learn more, see:
+
+* [Map (Python)](/documentation/transforms/python/elementwise/map/)
+* [MapElements (Java)](/documentation/transforms/java/elementwise/mapelements/)
+
+## Metrics
+
+Data on the state of a pipeline, potentially while the pipeline is running. You can use the built-in Beam metrics to gain insight into the functioning of your pipeline. For example, you might use Beam metrics to track errors, calls to a backend service, or the number of elements processed. Beam currently supports three types of metric: Counter, Distribution, and Gauge.
+
+To learn more, see:
+
+* [Metrics](/documentation/programming-guide/#metrics)
+
+## Multi-language pipeline
+
+A pipeline that uses cross-language transforms. You can combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline.
+
+To learn more, see:
+
+* [Multi-language pipelines](/documentation/programming-guide/#mulit-language-pipelines)
+
+## ParDo
+
+The lowest-level element-wise PTransform. For each element in an input PCollection, ParDo applies a function and emits zero, one, or multiple elements to an output PCollection. “ParDo” is short for “Parallel Do.” It’s similar to the map operation in a [MapReduce](https://en.wikipedia.org/wiki/MapReduce) algorithm, the `apply` method from a DataFrame, or the `UPDATE` keyword from SQL.
+
+To learn more, see:
+
+* [ParDo](/documentation/programming-guide/#pardo)
+* [ParDo (Java)](/documentation/transforms/java/elementwise/pardo/)
+* [ParDo (Python)](/documentation/transforms/python/elementwise/pardo/)
+
+## Partition
+
+An element-wise PTransform that splits a single PCollection into a fixed number of smaller PCollections. Partition requires a user-defined function (UDF) to determine how to split up the elements of the input collection into the resulting output collections. The number of partitions must be determined at graph construction time, meaning that you can’t determine the number of partitions using data calculated by the running pipeline.
+
+To learn more, see:
+
+* [Partition](/documentation/programming-guide/#partition)
+* [Partition (Java)](/documentation/transforms/java/elementwise/partition/)
+* [Partition (Python)](/documentation/transforms/python/elementwise/partition/)
+
+## PCollection
+
+A potentially distributed, homogeneous dataset or data stream. PCollections represent data in a Beam pipeline, and Beam transforms (PTransforms) use PCollection objects as inputs and outputs. PCollections are intended to be immutable, meaning that once a PCollection is created, you can’t add, remove, or change individual elements. The “P” stands for “parallel.”
+
+To learn more, see:
+
+* [PCollections](/documentation/programming-guide/#pcollections)
+
+## Pipe operator (`|`)
+
+Delimits a step in a Python pipeline. For example: `[Final Output PCollection] = ([Initial Input PCollection] | [First Transform] | [Second Transform] | [Third Transform])`. The output of each transform is passed from left to right as input to the next transform. The pipe operator in Python is equivalent to the `apply` method in Java (in other words, the pipe applies a transform to a PCollection).
+
+To learn more, see:
+
+* [Applying transforms](/documentation/programming-guide/#applying-transforms)
+
+## Pipeline
+
+An encapsulation of your entire data processing task, including reading input data from a source, transforming that data, and writing output data to a sink. You can think of a pipeline as a Beam program that uses PTransforms to process PCollections. The transforms in a pipeline can be represented as a directed acyclic graph (DAG). All Beam driver programs must create a pipeline.
+
+To learn more, see:
+
+* [Overview](/documentation/programming-guide/#overview)
+* [Creating a pipeline](/documentation/programming-guide/#creating-a-pipeline)
+* [Design your pipeline](/documentation/pipelines/design-your-pipeline/)
+* [Create your pipeline](/documentation/pipelines/create-your-pipeline/)
+
+## Processing time
+
+The time at which an element is processed at some stage in a pipeline. Processing time is not the same as event time, which is the time at which a data event occurs. Processing time is determined by the clock on the system processing the element. There’s no guarantee that elements will be processed in order of event time.
+
+To learn more, see:
+
+* [Watermarks and late data](/documentation/programming-guide/#watermarks-and-late-data)
+* [Triggers](/documentation/programming-guide/#triggers)
+
+## PTransform
+
+A data processing operation, or a step, in your pipeline. A PTransform takes zero or more PCollections as input, applies a processing function to the elements of that PCollection, and produces zero or more output PCollections. Some PTransforms accept user-defined functions that apply custom logic. The “P” stands for “parallel.”
+
+To learn more, see:
+
+* [Overview](/documentation/programming-guide/#overview)
+* [Transforms](/documentation/programming-guide/#transforms)
+
+## Runner
+
+A runner runs a pipeline on a specific platform. Most runners are translators or adapters to massively parallel big data processing systems. Other runners exist for local testing and debugging. Among the supported runners are Google Cloud Dataflow, Apache Spark, Apache Samza, Apache Flink, the Interactive Runner, and the Direct Runner.
+
+To learn more, see:
+
+* [Choosing a Runner](/documentation/#choosing-a-runner)
+* [Beam Capability Matrix](/documentation/runners/capability-matrix/)
+
+## Schema
+
+A language-independent type definition for a PCollection. The schema for a PCollection defines elements of that PCollection as an ordered list of named fields. Each field has a name, a type, and possibly a set of user options. Schemas provide a way to reason about types across different programming-language APIs.
+
+To learn more, see:
+
+* [Schemas](/documentation/programming-guide/#schemas)
+* [Schema Patterns](/documentation/patterns/schema/)
+
+## Session
+
+A time interval for grouping data events. A session is defined by some minimum gap duration between events. For example, a data stream representing user mouse activity may have periods with high concentrations of clicks followed by periods of inactivity. A session can represent such a pattern of activity followed by inactivity.
+
+To learn more, see:
+
+* [Session windows](/documentation/programming-guide/#session-windows)
+* [Analyzing Usage Patterns](/get-started/mobile-gaming-example/#analyzing-usage-patterns)
+
+## Side input
+
+Additional input to a PTransform. Side input is input that you provide in addition to the main input PCollection. A DoFn can access side input each time it processes an element in the PCollection. Side inputs are useful if your transform needs to inject additional data at runtime.
+
+To learn more, see:
+
+* [Side inputs](/documentation/programming-guide/#side-inputs)
+* [Side input patterns](/documentation/patterns/side-inputs/)
+
+## Sink
+
+A transform that writes to an external data storage system, like a file or database.
+
+To learn more, see:
+
+* [Developing new I/O connectors](/documentation/io/developing-io-overview/)
+* [Pipeline I/O](/documentation/programming-guide/#pipeline-io)
+* [Built-in I/O transforms](/documentation/io/built-in/)
+
+## Source
+A transform that reads from an external storage system. A pipeline typically reads input data from a source. The source has a type, which may be different from the sink type, so you can change the format of data as it moves through your pipeline.
+
+To learn more, see:
+
+* [Developing new I/O connectors](/documentation/io/developing-io-overview/)
+* [Pipeline I/O](/documentation/programming-guide/#pipeline-io)
+* [Built-in I/O transforms](/documentation/io/built-in/)
+
+## Splittable DoFn
+
+A generalization of DoFn that makes it easier to create complex, modular I/O connectors. A Splittable DoFn (SDF) can process elements in a non-monolithic way, meaning that the processing can be decomposed into smaller tasks. With SDF, you can check-point the processing of an element, and you can split the remaining work to yield additional parallelism. SDF is recommended for building new I/O connectors.
+
+To learn more, see:
+
+* [Splittable DoFns](/documentation/programming-guide/#splittable-dofns)
+* [Splittable DoFn in Apache Beam is Ready to Use](/blog/splittable-do-fn-is-available/)
+
+## State
+
+Persistent values that a PTransform can access. The state API lets you augment element-wise operations (for example, ParDo or Map) with mutable state. Using the state API, you can read from, and write to, state as you process each element of a PCollection. You can use the state API together with the timer API to create processing tasks that give you fine-grained control over the workflow.
+
+To learn more, see:
+
+* [State and Timers](/documentation/programming-guide/#state-and-timers)
+* [Stateful processing with Apache Beam](/blog/stateful-processing/)
+
+## Streaming
+
+A data processing paradigm for working with infinite, or unbounded, datasets. Reading from a streaming data source, such as Pub/Sub or Kafka, creates an unbounded PCollection. An unbounded PCollection must be processed using a job that runs continuously, because the entire collection can never be available for processing at any one time.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+* [Python Streaming Pipelines](/documentation/sdks/python-streaming/)
+
+# Timer
+
+A Beam feature that enables delayed processing of data stored using the state API. The timer API lets you set timers to call back at either an event-time or a processing-time timestamp. You can use the timer API together with the state API to create processing tasks that give you fine-grained control over the workflow.
+
+To learn more, see:
+
+* [State and Timers](/documentation/programming-guide/#state-and-timers)
+* [Stateful processing with Apache Beam](/blog/stateful-processing/)
+* [Timely (and Stateful) Processing with Apache Beam](/blog/timely-processing/)
+
+## Timestamp
+
+A point in time associated with an element in a PCollection and used to assign a window to the element. The source that creates the PCollection assigns each element an initial timestamp, often corresponding to when the element was read or added. But you can also manually assign timestamps. This can be useful if elements have an inherent timestamp, but the timestamp is somewhere in the structure of the element itself (for example, a time field in a server log entry).
+
+To learn more, see:
+
+* [Element timestamps](/documentation/programming-guide/#element-timestamps)
+* [Adding timestamps to a PCollection’s elements](/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements)
+
+## Transform
+
+See PTransform.
+
+## Trigger
+
+Determines when to emit aggregated result data from a window. You can use triggers to refine the windowing strategy for your pipeline. If you use the default windowing configuration and default trigger, Beam outputs an aggregated result when it estimates that all data for a window has arrived, and it discards all subsequent data for that window. But you can also use triggers to emit early results, before all the data in a given window has arrived, or to process late data by triggering after the event time watermark passes the end of the window.
+
+To learn more, see:
+
+* [Triggers](/documentation/programming-guide/#triggers)
+
+## Unbounded data
+
+A dataset of unlimited size. A PCollection can be bounded or unbounded, depending on the source of the data that it represents. Reading from a streaming or continuously-updating data source, such as Pub/Sub or Kafka, typically creates an unbounded PCollection.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+
+## User-defined function
+
+Custom logic that a PTransform applies to your data. Some PTransforms accept a user-defined function (UDF) as a way to configure the transform. For example, ParDo expects user code in the form of a DoFn object. Each language SDK has its own idiomatic way of expressing user-defined functions, but there are some common requirements, like serializability and thread compatibility.
+
+To learn more, see:
+
+* [User-Defined Functions (UDFs)](/documentation/basics/#user-defined-functions-udfs)
+* [ParDo](/documentation/programming-guide/#pardo)
+* [Requirements for writing user code for Beam transforms](/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms)
+
+## Watermark
+
+The point in event time when all data in a window can be expected to have arrived in the pipeline. Watermarks provide a way to estimate the completeness of input data. Every PCollection has an associated watermark. Once the watermark progresses past the end of a window, any element that arrives with a timestamp in that window is considered late data.
+
+To learn more, see:
+
+* [Watermarks and late data](/documentation/programming-guide/#watermarks-and-late-data)
+
+## Windowing
+
+Partitioning a PCollection into bounded subsets grouped by the timestamps of individual elements. In the Beam model, any PCollection – including unbounded PCollections – can be subdivided into logical windows. Each element in a PCollection is assigned to one or more windows according to the PCollection's windowing function, and each individual window contains a finite number of elements. Transforms that aggregate multiple elements, such as GroupByKey and Combine, work implicitly on a per-window basis.
+
+To learn more, see:
+
+* [Windowing](/documentation/programming-guide/#windowing)
+
+## Worker
+
+A container, process, or virtual machine (VM) that handles some part of the parallel processing of a pipeline. The Beam model doesn’t support synchronizing shared state across worker machines. Instead, each worker node has its own independent copy of state. A Beam runner might serialize elements between machines for communication purposes and for other reasons such as persistence.
+
+To learn more, see:
+
+* [Execution model](/documentation/runtime/model/)
diff --git a/website/www/site/content/en/get-started/downloads.md b/website/www/site/content/en/get-started/downloads.md
index c67343d..0026b72 100644
--- a/website/www/site/content/en/get-started/downloads.md
+++ b/website/www/site/content/en/get-started/downloads.md
@@ -88,7 +88,7 @@
 
 ## Releases
 
-### 2.29.0 (2021-04-15)
+### 2.29.0 (2021-04-27)
 Official [source code download](http://www.apache.org/dyn/closer.cgi/beam/2.29.0/apache-beam-2.29.0-source-release.zip).
 [SHA-512](https://downloads.apache.org/beam/2.29.0/apache-beam-2.29.0-source-release.zip.sha512).
 [signature](https://downloads.apache.org/beam/2.29.0/apache-beam-2.29.0-source-release.zip.asc).
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index 0718fc5..94aa1d1 100644
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -342,6 +342,8 @@
   </ul>
 </li>
 
+<li><a href="/documentation/glossary/">Glossary</a></li>
+
 <li class="section-nav-item--collapsible">
   <span class="section-nav-list-title">Runtime systems</span>