Merge pull request #14705: [BEAM-11754] Update docs about EOS support

diff --git a/.test-infra/jenkins/CommonJobProperties.groovy b/.test-infra/jenkins/CommonJobProperties.groovy
index 40f143a..851fc0b 100644
--- a/.test-infra/jenkins/CommonJobProperties.groovy
+++ b/.test-infra/jenkins/CommonJobProperties.groovy
@@ -49,7 +49,7 @@
 
     // Discard old builds. Build records are only kept up to this number of days.
     context.logRotator {
-      daysToKeep(14)
+      daysToKeep(30)
     }
 
     // Source code management.
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
index 267b4d5..369aad9 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
@@ -25,18 +25,14 @@
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -103,22 +99,13 @@
 
     @Override
     public PCollectionView<List<T>> expand(PCollection<T> input) {
+      PCollection<KV<Void, T>> materializationInput =
+          input.apply(new View.VoidKeyToMultimapMaterialization<>());
       Coder<T> inputCoder = input.getCoder();
-      PCollection<KV<Long, PCollectionViews.ValueOrMetadata<T, OffsetRange>>> materializationInput =
-          input
-              .apply("IndexElements", ParDo.of(new View.ToListViewDoFn<>()))
-              .setCoder(
-                  KvCoder.of(
-                      BigEndianLongCoder.of(),
-                      PCollectionViews.ValueOrMetadataCoder.create(
-                          inputCoder, OffsetRange.Coder.of())));
       PCollectionView<List<T>> view =
-          PCollectionViews.listView(
+          PCollectionViews.listViewUsingVoidKey(
               materializationInput,
-              (TupleTag<
-                      Materializations.MultimapView<
-                          Long, PCollectionViews.ValueOrMetadata<T, OffsetRange>>>)
-                  originalView.getTagInternal(),
+              (TupleTag<Materializations.MultimapView<Void, T>>) originalView.getTagInternal(),
               (PCollectionViews.TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
               materializationInput.getWindowingStrategy());
       materializationInput.apply(View.CreatePCollectionView.of(view));
diff --git a/runners/flink/1.12/build.gradle b/runners/flink/1.12/build.gradle
index 6db1ccb..93ba0c3 100644
--- a/runners/flink/1.12/build.gradle
+++ b/runners/flink/1.12/build.gradle
@@ -20,7 +20,7 @@
 /* All properties required for loading the Flink build script */
 project.ext {
   // Set the version of all Flink-related dependencies here.
-  flink_version = '1.12.2'
+  flink_version = '1.12.3'
   // Version specific code overrides.
   main_source_overrides = ["${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", './src/main/java']
   test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", './src/test/java']
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
index 35d865f..37fa6fc 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
@@ -69,17 +69,11 @@
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PCollectionViews.HasDefaultValue;
 import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn;
-import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn2;
 import org.apache.beam.sdk.values.PCollectionViews.ListViewFn;
-import org.apache.beam.sdk.values.PCollectionViews.ListViewFn2;
 import org.apache.beam.sdk.values.PCollectionViews.MapViewFn;
-import org.apache.beam.sdk.values.PCollectionViews.MapViewFn2;
 import org.apache.beam.sdk.values.PCollectionViews.MultimapViewFn;
-import org.apache.beam.sdk.values.PCollectionViews.MultimapViewFn2;
 import org.apache.beam.sdk.values.PCollectionViews.SingletonViewFn;
-import org.apache.beam.sdk.values.PCollectionViews.SingletonViewFn2;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
@@ -110,13 +104,7 @@
   private static final Object NULL_PLACE_HOLDER = new Object();
 
   private static final ImmutableList<Class<? extends ViewFn>> KNOWN_SINGLETON_VIEW_TYPES =
-      ImmutableList.of(
-          SingletonViewFn.class,
-          SingletonViewFn2.class,
-          MapViewFn.class,
-          MapViewFn2.class,
-          MultimapViewFn.class,
-          MultimapViewFn2.class);
+      ImmutableList.of(SingletonViewFn.class, MapViewFn.class, MultimapViewFn.class);
 
   /**
    * Limit the number of concurrent initializations.
@@ -314,7 +302,7 @@
       // We handle the singleton case separately since a null value may be returned.
       // We use a null place holder to represent this, and when we detect it, we translate
       // back to null for the user.
-      if (viewFn instanceof SingletonViewFn || viewFn instanceof SingletonViewFn2) {
+      if (viewFn instanceof SingletonViewFn) {
         ViewT rval =
             executionContext
                 .<PCollectionViewWindow<ViewT>, ViewT>getLogicalReferenceCache()
@@ -323,7 +311,7 @@
                     () -> {
                       @SuppressWarnings("unchecked")
                       ViewT viewT =
-                          getSingletonForWindow(tag, (HasDefaultValue<ViewT>) viewFn, window);
+                          getSingletonForWindow(tag, (SingletonViewFn<ViewT>) viewFn, window);
                       @SuppressWarnings("unchecked")
                       ViewT nullPlaceHolder = (ViewT) NULL_PLACE_HOLDER;
                       return viewT == null ? nullPlaceHolder : viewT;
@@ -331,10 +319,7 @@
         return rval == NULL_PLACE_HOLDER ? null : rval;
       } else if (singletonMaterializedTags.contains(tag)) {
         checkArgument(
-            viewFn instanceof MapViewFn
-                || viewFn instanceof MapViewFn2
-                || viewFn instanceof MultimapViewFn
-                || viewFn instanceof MultimapViewFn2,
+            viewFn instanceof MapViewFn || viewFn instanceof MultimapViewFn,
             "Unknown view type stored as singleton. Expected one of %s, got %s",
             KNOWN_SINGLETON_VIEW_TYPES,
             viewFn.getClass().getName());
@@ -351,19 +336,15 @@
             .get(
                 PCollectionViewWindow.of(view, window),
                 () -> {
-                  if (viewFn instanceof IterableViewFn
-                      || viewFn instanceof IterableViewFn2
-                      || viewFn instanceof ListViewFn
-                      || viewFn instanceof ListViewFn2) {
+                  if (viewFn instanceof IterableViewFn || viewFn instanceof ListViewFn) {
                     @SuppressWarnings("unchecked")
                     ViewT viewT = (ViewT) getListForWindow(tag, window);
                     return viewT;
-                  } else if (viewFn instanceof MapViewFn || viewFn instanceof MapViewFn2) {
+                  } else if (viewFn instanceof MapViewFn) {
                     @SuppressWarnings("unchecked")
                     ViewT viewT = (ViewT) getMapForWindow(tag, window);
                     return viewT;
-                  } else if (viewFn instanceof MultimapViewFn
-                      || viewFn instanceof MultimapViewFn2) {
+                  } else if (viewFn instanceof MultimapViewFn) {
                     @SuppressWarnings("unchecked")
                     ViewT viewT = (ViewT) getMultimapForWindow(tag, window);
                     return viewT;
@@ -394,7 +375,7 @@
    * </ul>
    */
   private <T, W extends BoundedWindow> T getSingletonForWindow(
-      TupleTag<?> viewTag, HasDefaultValue<T> viewFn, W window) throws IOException {
+      TupleTag<?> viewTag, SingletonViewFn<T> viewFn, W window) throws IOException {
     @SuppressWarnings({
       "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
       "unchecked"
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
index a8c1771..9bc32fc 100644
--- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
@@ -22,11 +22,14 @@
 import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.twister2.Twister2BatchTranslationContext;
 import org.apache.beam.runners.twister2.translators.BatchTransformTranslator;
+import org.apache.beam.runners.twister2.translators.functions.ByteToElemFunction;
 import org.apache.beam.runners.twister2.translators.functions.ByteToWindowFunctionPrimitive;
+import org.apache.beam.runners.twister2.translators.functions.ElemToBytesFunction;
 import org.apache.beam.runners.twister2.translators.functions.MapToTupleFunction;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -58,23 +61,50 @@
                 context.getCurrentTransform();
     org.apache.beam.sdk.values.PCollectionView<ViewT> input;
     PCollection<ElemT> inputPCol = context.getInput(transform);
-    final KvCoder coder = (KvCoder) inputPCol.getCoder();
-    Coder inputKeyCoder = coder.getKeyCoder();
+    final Coder coder = inputPCol.getCoder();
     WindowingStrategy windowingStrategy = inputPCol.getWindowingStrategy();
     WindowFn windowFn = windowingStrategy.getWindowFn();
-    final WindowedValue.WindowedValueCoder wvCoder =
-        WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());
-    BatchTSet<WindowedValue<ElemT>> inputGathered =
-        inputDataSet
-            .direct()
-            .map(new MapToTupleFunction<>(inputKeyCoder, wvCoder))
-            .allGather()
-            .map(new ByteToWindowFunctionPrimitive(inputKeyCoder, wvCoder));
     try {
       input = CreatePCollectionViewTranslation.getView(application);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    context.setSideInputDataSet(input.getTagInternal().getId(), inputGathered);
+
+    switch (input.getViewFn().getMaterialization().getUrn()) {
+      case Materializations.MULTIMAP_MATERIALIZATION_URN:
+        KvCoder kvCoder = (KvCoder<?, ?>) coder;
+        final Coder keyCoder = kvCoder.getKeyCoder();
+        final WindowedValue.WindowedValueCoder kvwvCoder =
+            WindowedValue.FullWindowedValueCoder.of(
+                kvCoder.getValueCoder(), windowFn.windowCoder());
+        BatchTSet<WindowedValue<ElemT>> multimapMaterialization =
+            inputDataSet
+                .direct()
+                .map(new MapToTupleFunction<>(keyCoder, kvwvCoder))
+                .allGather()
+                .map(new ByteToWindowFunctionPrimitive(keyCoder, kvwvCoder));
+        context.setSideInputDataSet(input.getTagInternal().getId(), multimapMaterialization);
+        break;
+      case Materializations.ITERABLE_MATERIALIZATION_URN:
+        final WindowedValue.WindowedValueCoder wvCoder =
+            WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder());
+        BatchTSet<WindowedValue<ElemT>> iterableMaterialization =
+            inputDataSet
+                .direct()
+                .map(new ElemToBytesFunction<>(wvCoder))
+                .allGather()
+                .map(new ByteToElemFunction(wvCoder));
+        try {
+          input = CreatePCollectionViewTranslation.getView(application);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        context.setSideInputDataSet(input.getTagInternal().getId(), iterableMaterialization);
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Unknown side input materialization "
+                + input.getViewFn().getMaterialization().getUrn());
+    }
   }
 }
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
new file mode 100644
index 0000000..578225f
--- /dev/null
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.MapFunc;
+import java.io.ObjectStreamException;
+import java.util.logging.Logger;
+import org.apache.beam.runners.twister2.utils.TranslationUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+
+/** ByteToWindow function. */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ByteToElemFunction<V> implements MapFunc<WindowedValue<V>, byte[]> {
+  private transient WindowedValueCoder<V> wvCoder;
+  private static final Logger LOG = Logger.getLogger(ByteToElemFunction.class.getName());
+
+  private transient boolean isInitialized = false;
+  private byte[] wvCoderBytes;
+
+  public ByteToElemFunction() {
+    // non arg constructor needed for kryo
+    isInitialized = false;
+  }
+
+  public ByteToElemFunction(final WindowedValueCoder<V> wvCoder) {
+    this.wvCoder = wvCoder;
+
+    wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder);
+  }
+
+  @Override
+  public WindowedValue<V> map(byte[] input) {
+    return TranslationUtils.fromByteArray(input, wvCoder);
+  }
+
+  @Override
+  public void prepare(TSetContext context) {
+    initTransient();
+  }
+
+  /**
+   * Method used to initialize the transient variables that were sent over as byte arrays or proto
+   * buffers.
+   */
+  private void initTransient() {
+    if (isInitialized) {
+      return;
+    }
+
+    wvCoder =
+        (WindowedValueCoder<V>)
+            SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Custom Coder Bytes");
+    this.isInitialized = true;
+  }
+
+  protected Object readResolve() throws ObjectStreamException {
+    return this;
+  }
+}
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
new file mode 100644
index 0000000..c83acdd
--- /dev/null
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.MapFunc;
+import java.io.ObjectStreamException;
+import java.util.logging.Logger;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Map to tuple function. */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ElemToBytesFunction<V> implements MapFunc<byte[], WindowedValue<V>> {
+
+  private transient WindowedValue.WindowedValueCoder<V> wvCoder;
+  private static final Logger LOG = Logger.getLogger(ElemToBytesFunction.class.getName());
+
+  private transient boolean isInitialized = false;
+  private byte[] wvCoderBytes;
+
+  public ElemToBytesFunction() {
+    // non arg constructor needed for kryo
+    this.isInitialized = false;
+  }
+
+  public ElemToBytesFunction(WindowedValue.WindowedValueCoder<V> wvCoder) {
+    this.wvCoder = wvCoder;
+    wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder);
+  }
+
+  @Override
+  public @Nullable byte[] map(WindowedValue<V> input) {
+    try {
+      return CoderUtils.encodeToByteArray(wvCoder, input);
+    } catch (CoderException e) {
+      LOG.info(e.getMessage());
+    }
+    return null;
+  }
+
+  @Override
+  public void prepare(TSetContext context) {
+    initTransient();
+  }
+
+  /**
+   * Method used to initialize the transient variables that were sent over as byte arrays or proto
+   * buffers.
+   */
+  private void initTransient() {
+    if (isInitialized) {
+      return;
+    }
+    wvCoder =
+        (WindowedValue.WindowedValueCoder<V>)
+            SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Coder");
+    this.isInitialized = true;
+  }
+
+  protected Object readResolve() throws ObjectStreamException {
+    return this;
+  }
+}
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
index bbcd392..6ed77c7 100644
--- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
@@ -23,6 +23,7 @@
 import edu.iu.dsc.tws.api.dataset.DataPartitionConsumer;
 import edu.iu.dsc.tws.api.tset.TSetContext;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,11 +32,11 @@
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.Materializations.MultimapView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -75,40 +76,79 @@
   }
 
   private <T> T getSideInput(PCollectionView<T> view, BoundedWindow window) {
-    Map<BoundedWindow, List<WindowedValue<KV<?, ?>>>> partitionedElements = new HashMap<>();
+    switch (view.getViewFn().getMaterialization().getUrn()) {
+      case Materializations.MULTIMAP_MATERIALIZATION_URN:
+        return getMultimapSideInput(view, window);
+      case Materializations.ITERABLE_MATERIALIZATION_URN:
+        return getIterableSideInput(view, window);
+      default:
+        throw new IllegalArgumentException(
+            "Unknown materialization type: " + view.getViewFn().getMaterialization().getUrn());
+    }
+  }
+
+  private <T> T getMultimapSideInput(PCollectionView<T> view, BoundedWindow window) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = getPartitionedElements(view);
+    Map<BoundedWindow, T> resultMap = new HashMap<>();
+
+    ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+    for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements :
+        partitionedElements.entrySet()) {
+
+      Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+      resultMap.put(
+          elements.getKey(),
+          viewFn.apply(
+              InMemoryMultimapSideInputView.fromIterable(
+                  keyCoder,
+                  (Iterable)
+                      elements.getValue().stream()
+                          .map(WindowedValue::getValue)
+                          .collect(Collectors.toList()))));
+    }
+    T result = resultMap.get(window);
+    if (result == null) {
+      result = viewFn.apply(InMemoryMultimapSideInputView.empty());
+    }
+    return result;
+  }
+
+  private Map<BoundedWindow, List<WindowedValue<?>>> getPartitionedElements(
+      PCollectionView<?> view) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = new HashMap<>();
     DataPartition<?> sideInput = runtimeContext.getInput(view.getTagInternal().getId());
     DataPartitionConsumer<?> dataPartitionConsumer = sideInput.getConsumer();
     while (dataPartitionConsumer.hasNext()) {
-      WindowedValue<KV<?, ?>> winValue = (WindowedValue<KV<?, ?>>) dataPartitionConsumer.next();
+      WindowedValue<?> winValue = (WindowedValue<?>) dataPartitionConsumer.next();
       for (BoundedWindow tbw : winValue.getWindows()) {
-        List<WindowedValue<KV<?, ?>>> windowedValues =
+        List<WindowedValue<?>> windowedValues =
             partitionedElements.computeIfAbsent(tbw, k -> new ArrayList<>());
         windowedValues.add(winValue);
       }
     }
+    return partitionedElements;
+  }
 
+  private <T> T getIterableSideInput(PCollectionView<T> view, BoundedWindow window) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = getPartitionedElements(view);
+
+    ViewFn<Materializations.IterableView, T> viewFn =
+        (ViewFn<Materializations.IterableView, T>) view.getViewFn();
     Map<BoundedWindow, T> resultMap = new HashMap<>();
 
-    for (Map.Entry<BoundedWindow, List<WindowedValue<KV<?, ?>>>> elements :
+    for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements :
         partitionedElements.entrySet()) {
-
-      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
-      Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
       resultMap.put(
           elements.getKey(),
-          (T)
-              viewFn.apply(
-                  InMemoryMultimapSideInputView.fromIterable(
-                      keyCoder,
-                      (Iterable)
-                          elements.getValue().stream()
-                              .map(WindowedValue::getValue)
-                              .collect(Collectors.toList()))));
+          viewFn.apply(
+              () ->
+                  elements.getValue().stream()
+                      .map(WindowedValue::getValue)
+                      .collect(Collectors.toList())));
     }
     T result = resultMap.get(window);
     if (result == null) {
-      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
-      result = viewFn.apply(InMemoryMultimapSideInputView.empty());
+      result = viewFn.apply(() -> Collections.<T>emptyList());
     }
     return result;
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 09464db..241fa1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -63,6 +63,7 @@
 import org.apache.beam.sdk.values.ValueWithRecordId.StripIdsDoFn;
 import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalListener;
@@ -827,24 +828,29 @@
                 source, checkpoint, BoundedWindow.TIMESTAMP_MIN_VALUE));
       }
 
+      private void initializeCurrentReader() throws IOException {
+        Preconditions.checkState(currentReader == null);
+        Object cacheKey =
+            createCacheKey(initialRestriction.getSource(), initialRestriction.getCheckpoint());
+        currentReader = cachedReaders.getIfPresent(cacheKey);
+        if (currentReader == null) {
+          currentReader =
+              initialRestriction
+                  .getSource()
+                  .createReader(pipelineOptions, initialRestriction.getCheckpoint());
+        } else {
+          // If the reader is from cache, then we know that the reader has been started.
+          // We also remove this cache entry to avoid eviction.
+          readerHasBeenStarted = true;
+          cachedReaders.invalidate(cacheKey);
+        }
+      }
+
       @Override
       public boolean tryClaim(UnboundedSourceValue<OutputT>[] position) {
         try {
           if (currentReader == null) {
-            Object cacheKey =
-                createCacheKey(initialRestriction.getSource(), initialRestriction.getCheckpoint());
-            currentReader = cachedReaders.getIfPresent(cacheKey);
-            if (currentReader == null) {
-              currentReader =
-                  initialRestriction
-                      .getSource()
-                      .createReader(pipelineOptions, initialRestriction.getCheckpoint());
-            } else {
-              // If the reader is from cache, then we know that the reader has been started.
-              // We also remove this cache entry to avoid eviction.
-              readerHasBeenStarted = true;
-              cachedReaders.invalidate(cacheKey);
-            }
+            initializeCurrentReader();
           }
           if (currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader) {
             return false;
@@ -872,6 +878,8 @@
               currentReader.close();
             } catch (IOException closeException) {
               e.addSuppressed(closeException);
+            } finally {
+              currentReader = null;
             }
           }
           throw new RuntimeException(e);
@@ -957,10 +965,7 @@
 
         if (currentReader == null) {
           try {
-            currentReader =
-                initialRestriction
-                    .getSource()
-                    .createReader(pipelineOptions, initialRestriction.getCheckpoint());
+            initializeCurrentReader();
           } catch (IOException e) {
             throw new RuntimeException(e);
           }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index a3b5665..63b4191 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
@@ -46,6 +47,7 @@
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.View.VoidKeyToMultimapMaterialization;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -1306,21 +1308,43 @@
 
     @Override
     public PCollectionView<OutputT> expand(PCollection<InputT> input) {
+      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
+              || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
+        PCollection<OutputT> combined =
+            input.apply(
+                "CombineValues",
+                Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
+        Coder<OutputT> outputCoder = combined.getCoder();
+        PCollectionView<OutputT> view =
+            PCollectionViews.singletonView(
+                combined,
+                (TypeDescriptorSupplier<OutputT>)
+                    () -> outputCoder != null ? outputCoder.getEncodedTypeDescriptor() : null,
+                input.getWindowingStrategy(),
+                insertDefault,
+                insertDefault ? fn.defaultValue() : null,
+                combined.getCoder());
+        combined.apply("CreatePCollectionView", CreatePCollectionView.of(view));
+        return view;
+      }
+
       PCollection<OutputT> combined =
-          input.apply(
-              "CombineValues",
-              Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
+          input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
+      PCollection<KV<Void, OutputT>> materializationInput =
+          combined.apply(new VoidKeyToMultimapMaterialization<>());
       Coder<OutputT> outputCoder = combined.getCoder();
       PCollectionView<OutputT> view =
-          PCollectionViews.singletonView(
-              combined,
+          PCollectionViews.singletonViewUsingVoidKey(
+              materializationInput,
               (TypeDescriptorSupplier<OutputT>)
                   () -> outputCoder != null ? outputCoder.getEncodedTypeDescriptor() : null,
               input.getWindowingStrategy(),
               insertDefault,
               insertDefault ? fn.defaultValue() : null,
               combined.getCoder());
-      combined.apply("CreatePCollectionView", CreatePCollectionView.of(view));
+      materializationInput.apply(CreatePCollectionView.of(view));
       return view;
     }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index e81f0b8..904575c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -27,6 +29,7 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -257,16 +260,33 @@
        * Long#MIN_VALUE} key is used to store all known {@link OffsetRange ranges} allowing us to
        * compute such an ordering.
        */
+
+      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
+              || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
+        Coder<T> inputCoder = input.getCoder();
+        PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> materializationInput =
+            input
+                .apply("IndexElements", ParDo.of(new ToListViewDoFn<>()))
+                .setCoder(
+                    KvCoder.of(
+                        BigEndianLongCoder.of(),
+                        ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
+        PCollectionView<List<T>> view =
+            PCollectionViews.listView(
+                materializationInput,
+                (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
+                input.getWindowingStrategy());
+        materializationInput.apply(CreatePCollectionView.of(view));
+        return view;
+      }
+
+      PCollection<KV<Void, T>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<>());
       Coder<T> inputCoder = input.getCoder();
-      PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> materializationInput =
-          input
-              .apply("IndexElements", ParDo.of(new ToListViewDoFn<>()))
-              .setCoder(
-                  KvCoder.of(
-                      BigEndianLongCoder.of(),
-                      ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
       PCollectionView<List<T>> view =
-          PCollectionViews.listView(
+          PCollectionViews.listViewUsingVoidKey(
               materializationInput,
               (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
               materializationInput.getWindowingStrategy());
@@ -280,8 +300,8 @@
    * range for each window seen. We use random offset ranges to minimize the chance that two ranges
    * overlap increasing the odds that each "key" represents a single index.
    */
-  @Internal
-  public static class ToListViewDoFn<T> extends DoFn<T, KV<Long, ValueOrMetadata<T, OffsetRange>>> {
+  private static class ToListViewDoFn<T>
+      extends DoFn<T, KV<Long, ValueOrMetadata<T, OffsetRange>>> {
     private Map<BoundedWindow, OffsetRange> windowsToOffsets = new HashMap<>();
 
     private OffsetRange generateRange(BoundedWindow window) {
@@ -330,19 +350,29 @@
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
+      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
+              || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
+        Coder<T> inputCoder = input.getCoder();
+        PCollectionView<Iterable<T>> view =
+            PCollectionViews.iterableView(
+                input,
+                (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
+                input.getWindowingStrategy());
+        input.apply(CreatePCollectionView.of(view));
+        return view;
+      }
+
+      PCollection<KV<Void, T>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<>());
       Coder<T> inputCoder = input.getCoder();
-      // HACK to work around https://issues.apache.org/jira/browse/BEAM-12228:
-      // There are bugs in "composite" vs "primitive" transform distinction
-      // in TransformHierachy. This noop transform works around them and should be zero
-      // cost.
-      PCollection<T> materializationInput =
-          input.apply(MapElements.via(new SimpleFunction<T, T>(x -> x) {}));
       PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(
+          PCollectionViews.iterableViewUsingVoidKey(
               materializationInput,
               (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
               materializationInput.getWindowingStrategy());
-      input.apply(CreatePCollectionView.of(view));
+      materializationInput.apply(CreatePCollectionView.of(view));
       return view;
     }
   }
@@ -478,22 +508,35 @@
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
+      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
+              || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
+        KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
+        Coder<K> keyCoder = kvCoder.getKeyCoder();
+        Coder<V> valueCoder = kvCoder.getValueCoder();
+        PCollectionView<Map<K, Iterable<V>>> view =
+            PCollectionViews.multimapView(
+                input,
+                (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
+                (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
+                input.getWindowingStrategy());
+        input.apply(CreatePCollectionView.of(view));
+        return view;
+      }
+
       KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
       Coder<K> keyCoder = kvCoder.getKeyCoder();
       Coder<V> valueCoder = kvCoder.getValueCoder();
-      // HACK to work around https://issues.apache.org/jira/browse/BEAM-12228:
-      // There are bugs in "composite" vs "primitive" transform distinction
-      // in TransformHierachy. This noop transform works around them and should be zero
-      // cost.
-      PCollection<KV<K, V>> materializationInput =
-          input.apply(MapElements.via(new SimpleFunction<KV<K, V>, KV<K, V>>(x -> x) {}));
+      PCollection<KV<Void, KV<K, V>>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<>());
       PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(
+          PCollectionViews.multimapViewUsingVoidKey(
               materializationInput,
               (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
               (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
               materializationInput.getWindowingStrategy());
-      input.apply(CreatePCollectionView.of(view));
+      materializationInput.apply(CreatePCollectionView.of(view));
       return view;
     }
   }
@@ -524,19 +567,37 @@
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
+      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
+      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
+          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
+              || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
+        KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
+        Coder<K> keyCoder = kvCoder.getKeyCoder();
+        Coder<V> valueCoder = kvCoder.getValueCoder();
+
+        PCollectionView<Map<K, V>> view =
+            PCollectionViews.mapView(
+                input,
+                (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
+                (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
+                input.getWindowingStrategy());
+        input.apply(CreatePCollectionView.of(view));
+        return view;
+      }
+
       KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
       Coder<K> keyCoder = kvCoder.getKeyCoder();
       Coder<V> valueCoder = kvCoder.getValueCoder();
 
-      PCollection<KV<K, V>> materializationInput =
-          input.apply(MapElements.via(new SimpleFunction<KV<K, V>, KV<K, V>>(x -> x) {}));
+      PCollection<KV<Void, KV<K, V>>> materializationInput =
+          input.apply(new VoidKeyToMultimapMaterialization<>());
       PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(
+          PCollectionViews.mapViewUsingVoidKey(
               materializationInput,
               (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
               (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
               materializationInput.getWindowingStrategy());
-      input.apply(CreatePCollectionView.of(view));
+      materializationInput.apply(CreatePCollectionView.of(view));
       return view;
     }
   }
@@ -545,11 +606,34 @@
   // Internal details below
 
   /**
+   * A {@link PTransform} which converts all values into {@link KV}s with {@link Void} keys.
+   *
+   * <p>TODO(BEAM-10097): Replace this materialization with specializations that optimize the
+   * various SDK requested views.
+   */
+  @Internal
+  public static class VoidKeyToMultimapMaterialization<T>
+      extends PTransform<PCollection<T>, PCollection<KV<Void, T>>> {
+
+    private static class VoidKeyToMultimapMaterializationDoFn<T> extends DoFn<T, KV<Void, T>> {
+      @ProcessElement
+      public void processElement(@Element T element, OutputReceiver<KV<Void, T>> r) {
+        r.output(KV.of((Void) null, element));
+      }
+    }
+
+    @Override
+    public PCollection<KV<Void, T>> expand(PCollection<T> input) {
+      PCollection output = input.apply(ParDo.of(new VoidKeyToMultimapMaterializationDoFn<>()));
+      output.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
+      return output;
+    }
+  }
+
+  /**
    * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
    *
-   * <p>Placeholder transform for runners to have a hook to materialize a {@link PCollection} as a
-   * side input. The metadata included in the {@link PCollectionView} is how the {@link PCollection}
-   * will be read as a side input.
+   * <p>Creates a primitive {@link PCollectionView}.
    *
    * @param <ElemT> The type of the elements of the input PCollection
    * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index 360c1af..df88e21 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -120,7 +120,6 @@
    */
   @Deprecated
   public static <T, W extends BoundedWindow> PCollectionView<T> singletonViewUsingVoidKey(
-      TupleTag<MultimapView<Void, T>> tag,
       PCollection<KV<Void, T>> pCollection,
       TypeDescriptorSupplier<T> typeDescriptorSupplier,
       WindowingStrategy<?, W> windowingStrategy,
@@ -129,7 +128,6 @@
       Coder<T> defaultValueCoder) {
     return new SimplePCollectionView<>(
         pCollection,
-        tag,
         new SingletonViewFn<>(hasDefault, defaultValue, defaultValueCoder, typeDescriptorSupplier),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
         windowingStrategy);
@@ -158,13 +156,11 @@
    */
   @Deprecated
   public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableViewUsingVoidKey(
-      TupleTag<MultimapView<Void, T>> tag,
       PCollection<KV<Void, T>> pCollection,
       TypeDescriptorSupplier<T> typeDescriptorSupplier,
       WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
-        tag,
         new IterableViewFn<>(typeDescriptorSupplier),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
         windowingStrategy);
@@ -188,35 +184,16 @@
   /**
    * Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
    * provided {@link WindowingStrategy}.
-   */
-  public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
-      PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
-      TupleTag<Materializations.MultimapView<Long, ValueOrMetadata<T, OffsetRange>>> tag,
-      TypeDescriptorSupplier<T> typeDescriptorSupplier,
-      WindowingStrategy<?, W> windowingStrategy) {
-    return new SimplePCollectionView<>(
-        pCollection,
-        tag,
-        new ListViewFn2<>(typeDescriptorSupplier),
-        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
-        windowingStrategy);
-  }
-
-  /**
-   * Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
-   * provided {@link WindowingStrategy}.
    *
    * @deprecated See {@link #listView}.
    */
   @Deprecated
   public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewUsingVoidKey(
-      TupleTag<MultimapView<Void, T>> tag,
       PCollection<KV<Void, T>> pCollection,
       TypeDescriptorSupplier<T> typeDescriptorSupplier,
       WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
-        tag,
         new ListViewFn<>(typeDescriptorSupplier),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
         windowingStrategy);
@@ -266,14 +243,12 @@
    */
   @Deprecated
   public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapViewUsingVoidKey(
-      TupleTag<MultimapView<Void, KV<K, V>>> tag,
       PCollection<KV<Void, KV<K, V>>> pCollection,
       TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
       TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
       WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
-        tag,
         new MapViewFn<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
         windowingStrategy);
@@ -304,14 +279,12 @@
   @Deprecated
   public static <K, V, W extends BoundedWindow>
       PCollectionView<Map<K, Iterable<V>>> multimapViewUsingVoidKey(
-          TupleTag<MultimapView<Void, KV<K, V>>> tag,
           PCollection<KV<Void, KV<K, V>>> pCollection,
           TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
           TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
           WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
-        tag,
         new MultimapViewFn<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
         windowingStrategy);
@@ -339,9 +312,7 @@
    * <p>{@link SingletonViewFn} is meant to be removed in the future and replaced with this class.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  @Internal
-  public static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T>
-      implements HasDefaultValue<T> {
+  private static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T> {
     private byte @Nullable [] encodedDefaultValue;
     private transient @Nullable T defaultValue;
     private @Nullable Coder<T> valueCoder;
@@ -379,7 +350,6 @@
      *
      * @throws NoSuchElementException if no default was specified.
      */
-    @Override
     public T getDefaultValue() {
       if (!hasDefault) {
         throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
@@ -423,11 +393,6 @@
     }
   }
 
-  @Internal
-  public interface HasDefaultValue<T> {
-    T getDefaultValue();
-  }
-
   /**
    * Implementation which is able to adapt a multimap materialization to a {@code T}.
    *
@@ -437,8 +402,7 @@
    */
   @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class SingletonViewFn<T> extends ViewFn<MultimapView<Void, T>, T>
-      implements HasDefaultValue<T> {
+  public static class SingletonViewFn<T> extends ViewFn<MultimapView<Void, T>, T> {
     private byte @Nullable [] encodedDefaultValue;
     private transient @Nullable T defaultValue;
     private @Nullable Coder<T> valueCoder;
@@ -476,7 +440,6 @@
      *
      * @throws NoSuchElementException if no default was specified.
      */
-    @Override
     public T getDefaultValue() {
       if (!hasDefault) {
         throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
@@ -530,8 +493,7 @@
    * <p>{@link IterableViewFn} is meant to be removed in the future and replaced with this class.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  @Internal
-  public static class IterableViewFn2<T> extends ViewFn<IterableView<T>, Iterable<T>> {
+  private static class IterableViewFn2<T> extends ViewFn<IterableView<T>, Iterable<T>> {
     private TypeDescriptorSupplier<T> typeDescriptorSupplier;
 
     public IterableViewFn2(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
@@ -597,7 +559,7 @@
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
   @VisibleForTesting
-  public static class ListViewFn2<T>
+  static class ListViewFn2<T>
       extends ViewFn<MultimapView<Long, ValueOrMetadata<T, OffsetRange>>, List<T>> {
     private TypeDescriptorSupplier<T> typeDescriptorSupplier;
 
@@ -1041,8 +1003,7 @@
    * <p>{@link MultimapViewFn} is meant to be removed in the future and replaced with this class.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  @Internal
-  public static class MultimapViewFn2<K, V>
+  private static class MultimapViewFn2<K, V>
       extends ViewFn<MultimapView<K, V>, Map<K, Iterable<V>>> {
     private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
     private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
@@ -1130,8 +1091,7 @@
    *
    * <p>{@link MapViewFn} is meant to be removed in the future and replaced with this class.
    */
-  @Internal
-  public static class MapViewFn2<K, V> extends ViewFn<MultimapView<K, V>, Map<K, V>> {
+  private static class MapViewFn2<K, V> extends ViewFn<MultimapView<K, V>, Map<K, V>> {
     private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
     private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
 
@@ -1319,13 +1279,7 @@
 
     @Override
     public String toString() {
-      return MoreObjects.toStringHelper(this)
-          .add("tag", tag)
-          .add("viewFn", viewFn)
-          .add("coder", coder)
-          .add("windowMappingFn", windowMappingFn)
-          .add("pCollection", pCollection)
-          .toString();
+      return MoreObjects.toStringHelper(this).add("tag", tag).toString();
     }
 
     @Override
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index 61b4bf8..b340d91 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.sdk.testing;
 
+import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -40,42 +42,82 @@
     // materializations will differ but test code should not worry about what these look like if
     // they are relying on the ViewFn to "undo" the conversion.
 
-    if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
-      for (Object value : values) {
-        rval.add(value);
-      }
-    } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
-      for (Object value : values) {
-        rval.add(value);
-      }
-    } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
-      if (values.length > 0) {
-        rval.add(
-            KV.of(
-                Long.MIN_VALUE, ValueOrMetadata.createMetadata(new OffsetRange(0, values.length))));
-        for (int i = 0; i < values.length; ++i) {
-          rval.add(KV.of((long) i, ValueOrMetadata.create(values[i])));
+    // TODO(BEAM-10097): Make this the default case once all portable runners can support
+    // the iterable access pattern.
+    if (hasExperiment(options, "beam_fn_api")
+        && (hasExperiment(options, "use_runner_v2")
+            || hasExperiment(options, "use_unified_worker"))) {
+      if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
+        for (Object value : values) {
+          rval.add(value);
         }
-      }
-    } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
-      for (Object value : values) {
-        rval.add(value);
-      }
-    } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
-      for (Object value : values) {
-        rval.add(value);
+      } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
+        for (Object value : values) {
+          rval.add(value);
+        }
+      } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
+        if (values.length > 0) {
+          rval.add(
+              KV.of(
+                  Long.MIN_VALUE,
+                  ValueOrMetadata.createMetadata(new OffsetRange(0, values.length))));
+          for (int i = 0; i < values.length; ++i) {
+            rval.add(KV.of((long) i, ValueOrMetadata.create(values[i])));
+          }
+        }
+      } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
+        for (Object value : values) {
+          rval.add(value);
+        }
+      } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
+        for (Object value : values) {
+          rval.add(value);
+        }
+      } else {
+        throw new IllegalArgumentException(
+            String.format(
+                "Unknown type of view %s. Supported views are %s.",
+                viewTransformClass.getClass(),
+                ImmutableSet.of(
+                    View.AsSingleton.class,
+                    View.AsIterable.class,
+                    View.AsList.class,
+                    View.AsMap.class,
+                    View.AsMultimap.class)));
       }
     } else {
-      throw new IllegalArgumentException(
-          String.format(
-              "Unknown type of view %s. Supported views are %s.",
-              viewTransformClass.getClass(),
-              ImmutableSet.of(
-                  View.AsSingleton.class,
-                  View.AsIterable.class,
-                  View.AsList.class,
-                  View.AsMap.class,
-                  View.AsMultimap.class)));
+      if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
+        for (Object value : values) {
+          rval.add(KV.of(null, value));
+        }
+      } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
+        for (Object value : values) {
+          rval.add(KV.of(null, value));
+        }
+      } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
+        for (Object value : values) {
+          rval.add(KV.of(null, value));
+        }
+      } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
+        for (Object value : values) {
+          rval.add(KV.of(null, value));
+        }
+      } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
+        for (Object value : values) {
+          rval.add(KV.of(null, value));
+        }
+      } else {
+        throw new IllegalArgumentException(
+            String.format(
+                "Unknown type of view %s. Supported views are %s.",
+                viewTransformClass.getClass(),
+                ImmutableSet.of(
+                    View.AsSingleton.class,
+                    View.AsIterable.class,
+                    View.AsList.class,
+                    View.AsMap.class,
+                    View.AsMultimap.class)));
+      }
     }
     return Collections.unmodifiableList(rval);
   }
diff --git a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
index ec328d8..0a48131 100644
--- a/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
+++ b/sdks/java/testing/nexmark/src/main/java/org/apache/beam/sdk/nexmark/sources/UnboundedEventSource.java
@@ -98,7 +98,7 @@
      * Current backlog, as estimated number of event bytes we are behind, or null if unknown.
      * Reported to callers.
      */
-    private @Nullable Long backlogBytes;
+    private long backlogBytes;
 
     /** Wallclock time (ms since epoch) we last reported the backlog, or -1 if never reported. */
     private long lastReportedBacklogWallclock;
@@ -127,6 +127,7 @@
       lastReportedBacklogWallclock = -1;
       pendingEventWallclockTime = -1;
       timestampAtLastReportedBacklogMs = -1;
+      updateBacklog(System.currentTimeMillis(), 0);
     }
 
     public EventReader(GeneratorConfig config) {
@@ -146,9 +147,7 @@
       while (pendingEvent == null) {
         if (!generator.hasNext() && heldBackEvents.isEmpty()) {
           // No more events, EVER.
-          if (isRateLimited) {
-            updateBacklog(System.currentTimeMillis(), 0);
-          }
+          updateBacklog(System.currentTimeMillis(), 0);
           if (watermark < BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
             watermark = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
             LOG.trace("stopped unbounded generator {}", generator);
@@ -177,9 +176,7 @@
           }
         } else {
           // Waiting for held-back event to fire.
-          if (isRateLimited) {
-            updateBacklog(now, 0);
-          }
+          updateBacklog(now, 0);
           return false;
         }
 
@@ -199,6 +196,8 @@
           return false;
         }
         updateBacklog(now, now - pendingEventWallclockTime);
+      } else {
+        updateBacklog(now, 0);
       }
 
       // This event is ready to fire.
@@ -210,20 +209,26 @@
     private void updateBacklog(long now, long newBacklogDurationMs) {
       backlogDurationMs = newBacklogDurationMs;
       long interEventDelayUs = generator.currentInterEventDelayUs();
-      if (interEventDelayUs != 0) {
+      if (isRateLimited && interEventDelayUs > 0) {
         long backlogEvents = (backlogDurationMs * 1000 + interEventDelayUs - 1) / interEventDelayUs;
         backlogBytes = generator.getCurrentConfig().estimatedBytesForEvents(backlogEvents);
+      } else {
+        double fractionRemaining = 1.0 - generator.getFractionConsumed();
+        backlogBytes =
+            Math.max(
+                0L,
+                (long) (generator.getCurrentConfig().getEstimatedSizeBytes() * fractionRemaining));
       }
       if (lastReportedBacklogWallclock < 0
           || now - lastReportedBacklogWallclock > BACKLOG_PERIOD.getMillis()) {
-        double timeDialation = Double.NaN;
+        double timeDilation = Double.NaN;
         if (pendingEvent != null
             && lastReportedBacklogWallclock >= 0
             && timestampAtLastReportedBacklogMs >= 0) {
           long wallclockProgressionMs = now - lastReportedBacklogWallclock;
           long eventTimeProgressionMs =
               pendingEvent.getTimestamp().getMillis() - timestampAtLastReportedBacklogMs;
-          timeDialation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs;
+          timeDilation = (double) eventTimeProgressionMs / (double) wallclockProgressionMs;
         }
         LOG.debug(
             "unbounded generator backlog now {}ms ({} bytes) at {}us interEventDelay "
@@ -231,7 +236,7 @@
             backlogDurationMs,
             backlogBytes,
             interEventDelayUs,
-            timeDialation);
+            timeDilation);
         lastReportedBacklogWallclock = now;
         if (pendingEvent != null) {
           timestampAtLastReportedBacklogMs = pendingEvent.getTimestamp().getMillis();
@@ -277,7 +282,7 @@
 
     @Override
     public long getSplitBacklogBytes() {
-      return backlogBytes == null ? BACKLOG_UNKNOWN : backlogBytes;
+      return backlogBytes;
     }
 
     @Override
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index f615290..498f877 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -329,6 +329,7 @@
 DATACLASS_TYPE = 101
 NAMED_TUPLE_TYPE = 102
 ENUM_TYPE = 103
+NESTED_STATE_TYPE = 104
 
 # Types that can be encoded as iterables, but are not literally
 # lists, etc. due to being lazy.  The actual type is not preserved
@@ -442,24 +443,50 @@
             "for the input of '%s'" %
             (value, type(value), self.requires_deterministic_step_label))
       self.encode_type(type(value), stream)
-      self.iterable_coder_impl.encode_to_stream(
-          [getattr(value, field.name) for field in dataclasses.fields(value)],
-          stream,
-          True)
+      values = [
+          getattr(value, field.name) for field in dataclasses.fields(value)
+      ]
+      try:
+        self.iterable_coder_impl.encode_to_stream(values, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
     elif isinstance(value, tuple) and hasattr(type(value), '_fields'):
       stream.write_byte(NAMED_TUPLE_TYPE)
       self.encode_type(type(value), stream)
-      self.iterable_coder_impl.encode_to_stream(value, stream, True)
+      try:
+        self.iterable_coder_impl.encode_to_stream(value, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
     elif isinstance(value, enum.Enum):
       stream.write_byte(ENUM_TYPE)
       self.encode_type(type(value), stream)
       # Enum values can be of any type.
-      self.encode_to_stream(value.value, stream, True)
+      try:
+        self.encode_to_stream(value.value, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
+    elif hasattr(value, "__getstate__"):
+      if not hasattr(value, "__setstate__"):
+        raise TypeError(
+            "Unable to deterministically encode '%s' of type '%s', "
+            "for the input of '%s'. The object defines __getstate__ but not "
+            "__setstate__." %
+            (value, type(value), self.requires_deterministic_step_label))
+      stream.write_byte(NESTED_STATE_TYPE)
+      self.encode_type(type(value), stream)
+      state_value = value.__getstate__()
+      try:
+        self.encode_to_stream(state_value, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
     else:
-      raise TypeError(
-          "Unable to deterministically encode '%s' of type '%s', "
-          "please provide a type hint for the input of '%s'" %
-          (value, type(value), self.requires_deterministic_step_label))
+      raise TypeError(self._deterministic_encoding_error_msg(value))
+
+  def _deterministic_encoding_error_msg(self, value):
+    return (
+        "Unable to deterministically encode '%s' of type '%s', "
+        "please provide a type hint for the input of '%s'" %
+        (value, type(value), self.requires_deterministic_step_label))
 
   def encode_type(self, t, stream):
     stream.write(dill.dumps(t), True)
@@ -510,6 +537,12 @@
     elif t == ENUM_TYPE:
       cls = self.decode_type(stream)
       return cls(self.decode_from_stream(stream, True))
+    elif t == NESTED_STATE_TYPE:
+      cls = self.decode_type(stream)
+      state = self.decode_from_stream(stream, True)
+      value = cls.__new__(cls)
+      value.__setstate__(state)
+      return value
     elif t == UNKNOWN_TYPE:
       return self.fallback_coder_impl.decode_from_stream(stream, nested)
     else:
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 17825a9..0a18e27 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -69,6 +69,22 @@
 MyFlag = enum.Flag('MyFlag', 'F1 F2 F3')  # pylint: disable=too-many-function-args
 
 
+class DefinesGetState:
+  def __init__(self, value):
+    self.value = value
+
+  def __getstate__(self):
+    return self.value
+
+  def __eq__(self, other):
+    return type(other) is type(self) and other.value == self.value
+
+
+class DefinesGetAndSetState(DefinesGetState):
+  def __setstate__(self, value):
+    self.value = value
+
+
 # Defined out of line for picklability.
 class CustomCoder(coders.Coder):
   def encode(self, x):
@@ -236,6 +252,15 @@
     self.check_coder(deterministic_coder, list(MyIntFlag))
     self.check_coder(deterministic_coder, list(MyFlag))
 
+    self.check_coder(
+        deterministic_coder,
+        [DefinesGetAndSetState(1), DefinesGetAndSetState((1, 2, 3))])
+
+    with self.assertRaises(TypeError):
+      self.check_coder(deterministic_coder, DefinesGetState(1))
+    with self.assertRaises(TypeError):
+      self.check_coder(deterministic_coder, DefinesGetAndSetState(dict()))
+
   def test_dill_coder(self):
     cell_value = (lambda x: lambda: x)(0).__closure__[0]
     self.check_coder(coders.DillCoder(), 'a', 1, cell_value)
diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py
index 80ee149..54a15c5 100644
--- a/sdks/python/apache_beam/dataframe/expressions.py
+++ b/sdks/python/apache_beam/dataframe/expressions.py
@@ -348,10 +348,16 @@
       preserves_partition_by: The level of partitioning preserved.
     """
     if (not _get_allow_non_parallel() and
-        requires_partition_by == partitionings.Singleton()):
+        isinstance(requires_partition_by, partitionings.Singleton)):
+      reason = requires_partition_by.reason or (
+          f"Encountered non-parallelizable form of {name!r}.")
+
       raise NonParallelOperation(
-          "Using non-parallel form of %s "
-          "outside of allow_non_parallel_operations block." % name)
+          f"{reason}\n"
+          "Consider using an allow_non_parallel_operations block if you're "
+          "sure you want to do this. See "
+          "https://s.apache.org/dataframe-non-parallel-operations for more "
+          "information.")
     args = tuple(args)
     if proxy is None:
       proxy = func(*(arg.proxy() for arg in args))
@@ -406,4 +412,6 @@
 
 
 class NonParallelOperation(Exception):
-  pass
+  def __init__(self, msg):
+    super(NonParallelOperation, self).__init__(self, msg)
+    self.msg = msg
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 1f892f4..25355fe 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -109,7 +109,11 @@
     if index is not None and errors == 'raise':
       # In order to raise an error about missing index values, we'll
       # need to collect the entire dataframe.
-      requires = partitionings.Singleton()
+      requires = partitionings.Singleton(
+          reason=(
+              "drop(errors='raise', axis='index') is not currently "
+              "parallelizable. This requires collecting all data on a single "
+              f"node in order to detect if one of {index!r} is missing."))
     else:
       requires = partitionings.Arbitrary()
 
@@ -142,24 +146,26 @@
   def fillna(self, value, method, axis, limit, **kwargs):
     # Default value is None, but is overriden with index.
     axis = axis or 'index'
-    if method is not None and axis in (0, 'index'):
-      raise frame_base.WontImplementError(
-          f"fillna(method={method!r}) is not supported because it is "
-          "order-sensitive. Only fillna(method=None) is supported.",
-          reason="order-sensitive")
+
+    if axis in (0, 'index'):
+      if method is not None:
+        raise frame_base.WontImplementError(
+            f"fillna(method={method!r}, axis={axis!r}) is not supported "
+            "because it is order-sensitive. Only fillna(method=None) is "
+            f"supported with axis={axis!r}.",
+            reason="order-sensitive")
+      if limit is not None:
+        raise frame_base.WontImplementError(
+            f"fillna(limit={method!r}, axis={axis!r}) is not supported because "
+            "it is order-sensitive. Only fillna(limit=None) is supported with "
+            f"axis={axis!r}.",
+            reason="order-sensitive")
+
     if isinstance(value, frame_base.DeferredBase):
       value_expr = value._expr
     else:
       value_expr = expressions.ConstantExpression(value)
 
-    if limit is not None and method is None:
-      # If method is not None (and axis is 'columns'), we can do limit in
-      # a distributed way. Otherwise the limit is global, so it requires
-      # Singleton partitioning.
-      requires = partitionings.Singleton()
-    else:
-      requires = partitionings.Arbitrary()
-
     return frame_base.DeferredFrame.wrap(
         # yapf: disable
         expressions.ComputedExpression(
@@ -169,7 +175,7 @@
                 value, method=method, axis=axis, limit=limit, **kwargs),
             [self._expr, value_expr],
             preserves_partition_by=partitionings.Arbitrary(),
-            requires_partition_by=requires))
+            requires_partition_by=partitionings.Arbitrary()))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
@@ -523,7 +529,11 @@
     if errors == "ignore":
       # We need all data in order to ignore errors and propagate the original
       # data.
-      requires = partitionings.Singleton()
+      requires = partitionings.Singleton(
+          reason=(
+              f"where(errors={errors!r}) is currently not parallelizable, "
+              "because all data must be collected on one node to determine if "
+              "the original data should be propagated instead."))
 
     actual_args['errors'] = errors
 
@@ -668,10 +678,8 @@
           reason="order-sensitive")
 
     if verify_integrity:
-      # verifying output has a unique index requires global index.
-      # TODO(BEAM-11839): Attach an explanation to the Singleton partitioning
-      # requirement, and include it in raised errors.
-      requires = partitionings.Singleton()
+      # We can verify the index is non-unique within index partitioned data.
+      requires = partitionings.Index()
     else:
       requires = partitionings.Arbitrary()
 
@@ -750,7 +758,12 @@
       right = other._expr
       right_is_series = False
     else:
-      raise frame_base.WontImplementError('non-deferred result')
+      raise frame_base.WontImplementError(
+          "other must be a DeferredDataFrame or DeferredSeries instance. "
+          "Passing a concrete list or numpy array is not supported. Those "
+          "types have no index and must be joined based on the order of the "
+          "data.",
+          reason="order-sensitive")
 
     dots = expressions.ComputedExpression(
         'dot',
@@ -838,6 +851,10 @@
       return x._corr_aligned(y, min_periods)
 
     else:
+      reason = (
+          f"Encountered corr(method={method!r}) which cannot be "
+          "parallelized. Only corr(method='pearson') is currently "
+          "parallelizable.")
       # The rank-based correlations are not obviously parallelizable, though
       # perhaps an approximation could be done with a knowledge of quantiles
       # and custom partitioning.
@@ -847,9 +864,7 @@
               lambda df,
               other: df.corr(other, method=method, min_periods=min_periods),
               [self._expr, other._expr],
-              # TODO(BEAM-11839): Attach an explanation to the Singleton
-              # partitioning requirement, and include it in raised errors.
-              requires_partition_by=partitionings.Singleton()))
+              requires_partition_by=partitionings.Singleton(reason=reason)))
 
   def _corr_aligned(self, other, min_periods):
     std_x = self.std()
@@ -958,9 +973,16 @@
         return frame_base.DeferredFrame.wrap(
             expressions.ComputedExpression(
                 'aggregate',
-                lambda s: s.agg(func, *args, **kwargs), [intermediate],
+                lambda s: s.agg(func, *args, **kwargs),
+                [intermediate],
                 preserves_partition_by=partitionings.Arbitrary(),
-                requires_partition_by=partitionings.Singleton()))
+                # TODO(BEAM-11839): This reason should be more specific. It's
+                # actually incorrect for the args/kwargs case above.
+                requires_partition_by=partitionings.Singleton(
+                    reason=(
+                        f"Aggregation function {func!r} cannot currently be "
+                        "parallelized, it requires collecting all data for "
+                        "this Series on a single node."))))
 
   agg = aggregate
 
@@ -1119,7 +1141,10 @@
     if limit is None:
       requires_partition_by = partitionings.Arbitrary()
     else:
-      requires_partition_by = partitionings.Singleton()
+      requires_partition_by = partitionings.Singleton(
+          reason=(
+              f"replace(limit={limit!r}) cannot currently be parallelized, it "
+              "requires collecting all data on a single node."))
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'replace',
@@ -1154,7 +1179,8 @@
             'unique',
             lambda df: pd.Series(df.unique()), [self._expr],
             preserves_partition_by=partitionings.Singleton(),
-            requires_partition_by=partitionings.Singleton()))
+            requires_partition_by=partitionings.Singleton(
+                reason="unique() cannot currently be parallelized.")))
 
   def update(self, other):
     self._expr = expressions.ComputedExpression(
@@ -1242,7 +1268,8 @@
       elif _is_integer_slice(key):
         # This depends on the contents of the index.
         raise frame_base.WontImplementError(
-            'Use iloc or loc with integer slices.')
+            "Integer slices are not supported as they are ambiguous. Please "
+            "use iloc or loc with integer slices.")
       else:
         return self.loc[key]
 
@@ -1278,7 +1305,10 @@
   @frame_base.populate_defaults(pd.DataFrame)
   def align(self, other, join, axis, copy, level, method, **kwargs):
     if not copy:
-      raise frame_base.WontImplementError('align(copy=False)')
+      raise frame_base.WontImplementError(
+          "align(copy=False) is not supported because it might be an inplace "
+          "operation depending on the data. Please prefer the default "
+          "align(copy=True).")
     if method is not None:
       raise frame_base.WontImplementError(
           f"align(method={method!r}) is not supported because it is "
@@ -1289,7 +1319,9 @@
 
     if level is not None:
       # Could probably get by partitioning on the used levels.
-      requires_partition_by = partitionings.Singleton()
+      requires_partition_by = partitionings.Singleton(reason=(
+          f"align(level={level}) is not currently parallelizable. Only "
+          "align(level=None) can be parallelized."))
     elif axis in ('columns', 1):
       requires_partition_by = partitionings.Arbitrary()
     else:
@@ -1314,16 +1346,21 @@
           "append(ignore_index=True) is order sensitive because it requires "
           "generating a new index based on the order of the data.",
           reason="order-sensitive")
+
     if verify_integrity:
-      raise frame_base.WontImplementError(
-          "append(verify_integrity=True) produces an execution time error")
+      # We can verify the index is non-unique within index partitioned data.
+      requires = partitionings.Index()
+    else:
+      requires = partitionings.Arbitrary()
 
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'append',
-            lambda s, other: s.append(other, sort=sort, **kwargs),
+            lambda s, other: s.append(other, sort=sort,
+                                      verify_integrity=verify_integrity,
+                                      **kwargs),
             [self._expr, other._expr],
-            requires_partition_by=partitionings.Arbitrary(),
+            requires_partition_by=requires,
             preserves_partition_by=partitionings.Arbitrary()
         )
     )
@@ -1391,8 +1428,6 @@
             preserves_partition_by=preserves,
             requires_partition_by=partitionings.Arbitrary()))
 
-
-
   def aggregate(self, func, axis=0, *args, **kwargs):
     if axis is None:
       # Aggregate across all elements by first aggregating across columns,
@@ -1414,6 +1449,7 @@
             'aggregate',
             lambda df: df.agg(func, *args, **kwargs),
             [self._expr],
+            # TODO(BEAM-11839): Provide a reason for this Singleton
             requires_partition_by=partitionings.Singleton()))
     else:
       # In the general case, compute the aggregation of each column separately,
@@ -1499,12 +1535,15 @@
                 proxy=proxy))
 
     else:
+      reason = (f"Encountered corr(method={method!r}) which cannot be "
+                "parallelized. Only corr(method='pearson') is currently "
+                "parallelizable.")
       return frame_base.DeferredFrame.wrap(
           expressions.ComputedExpression(
               'corr',
               lambda df: df.corr(method=method, min_periods=min_periods),
               [self._expr],
-              requires_partition_by=partitionings.Singleton()))
+              requires_partition_by=partitionings.Singleton(reason=reason)))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
@@ -1653,8 +1692,12 @@
             'mode',
             lambda df: df.mode(*args, **kwargs),
             [self._expr],
-            #TODO(robertwb): Approximate?
-            requires_partition_by=partitionings.Singleton(),
+            #TODO(BEAM-12181): Can we add an approximate implementation?
+            requires_partition_by=partitionings.Singleton(reason=(
+                "mode(axis='index') cannot currently be parallelized. See "
+                "BEAM-12181 tracking the possble addition of an approximate, "
+                "parallelizable implementation of mode."
+            )),
             preserves_partition_by=partitionings.Singleton()))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
@@ -1662,8 +1705,12 @@
   @frame_base.maybe_inplace
   def dropna(self, axis, **kwargs):
     # TODO(robertwb): This is a common pattern. Generalize?
-    if axis == 1 or axis == 'columns':
-      requires_partition_by = partitionings.Singleton()
+    if axis in (1, 'columns'):
+      requires_partition_by = partitionings.Singleton(reason=(
+          "dropna(axis=1) cannot currently be parallelized. It requires "
+          "checking all values in each column for NaN values, to determine "
+          "if that column should be dropped."
+      ))
     else:
       requires_partition_by = partitionings.Arbitrary()
     return frame_base.DeferredFrame.wrap(
@@ -1913,8 +1960,11 @@
       requires_partition_by = partitionings.Arbitrary()
       preserves_partition_by = partitionings.Index()
     else:
-      # TODO: This could be implemented in a distributed fashion
-      requires_partition_by = partitionings.Singleton()
+      # TODO(BEAM-9547): This could be implemented in a distributed fashion,
+      # perhaps by deferring to a distributed drop_duplicates
+      requires_partition_by = partitionings.Singleton(reason=(
+         "nunique(axis='index') is not currently parallelizable."
+      ))
       preserves_partition_by = partitionings.Singleton()
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
@@ -1941,22 +1991,31 @@
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
   def quantile(self, q, axis, **kwargs):
-    if axis in (1, 'columns') and isinstance(q, list):
-      raise frame_base.WontImplementError(
-          "quantile(axis=columns) with multiple q values is not supported "
-          "because it transposes the input DataFrame. Note computing "
-          "an individual quantile across columns (e.g. "
-          f"df.quantile(q={q[0]!r}, axis={axis!r}) is supported.",
-          reason="non-deferred-columns")
+    if axis in (1, 'columns'):
+      if isinstance(q, list):
+        raise frame_base.WontImplementError(
+            "quantile(axis=columns) with multiple q values is not supported "
+            "because it transposes the input DataFrame. Note computing "
+            "an individual quantile across columns (e.g. "
+            f"df.quantile(q={q[0]!r}, axis={axis!r}) is supported.",
+            reason="non-deferred-columns")
+      else:
+        requires = partitionings.Arbitrary()
+    else: # axis='index'
+      # TODO(BEAM-12167): Provide an option for approximate distributed
+      # quantiles
+      requires = partitionings.Singleton(reason=(
+          "Computing quantiles across index cannot currently be parallelized. "
+          "See BEAM-12167 tracking the possible addition of an approximate, "
+          "parallelizable implementation of quantile."
+      ))
 
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'quantile',
             lambda df: df.quantile(q=q, axis=axis, **kwargs),
             [self._expr],
-            # TODO(BEAM-12167): Provide an option for approximate distributed
-            # quantiles
-            requires_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires,
             preserves_partition_by=partitionings.Singleton()))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
@@ -1978,8 +2037,15 @@
       preserves_partition_by = partitionings.Index()
 
     if kwargs.get('errors', None) == 'raise' and rename_index:
-      # Renaming index with checking requires global index.
-      requires_partition_by = partitionings.Singleton()
+      # TODO: We could do this in parallel by creating a ConstantExpression
+      # with a series created from the mapper dict. Then Index() partitioning
+      # would co-locate the necessary index values and we could raise
+      # individually within each partition. Execution time errors are
+      # discouraged anyway so probably not worth the effort.
+      requires_partition_by = partitionings.Singleton(reason=(
+          "rename(errors='raise', axis='index') requires collecting all "
+          "data on a single node in order to detect missing index values."
+      ))
     else:
       requires_partition_by = partitionings.Arbitrary()
 
@@ -2014,7 +2080,9 @@
     if limit is None:
       requires_partition_by = partitionings.Arbitrary()
     else:
-      requires_partition_by = partitionings.Singleton()
+      requires_partition_by = partitionings.Singleton(reason=(
+         f"replace(limit={limit!r}) cannot currently be parallelized, it "
+         "requires collecting all data on a single node."))
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'replace',
@@ -2032,8 +2100,11 @@
     if level is not None and not isinstance(level, (tuple, list)):
       level = [level]
     if level is None or len(level) == self._expr.proxy().index.nlevels:
-      # TODO: Could do distributed re-index with offsets.
-      requires_partition_by = partitionings.Singleton()
+      # TODO(BEAM-12182): Could do distributed re-index with offsets.
+      requires_partition_by = partitionings.Singleton(reason=(
+          "reset_index(level={level!r}) drops the entire index and creates a "
+          "new one, so it cannot currently be parallelized (BEAM-12182)."
+      ))
     else:
       requires_partition_by = partitionings.Arbitrary()
     return frame_base.DeferredFrame.wrap(
@@ -2070,20 +2141,37 @@
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
-  def shift(self, axis, **kwargs):
-    if 'freq' in kwargs:
-      raise frame_base.WontImplementError('data-dependent')
-    if axis == 1 or axis == 'columns':
-      requires_partition_by = partitionings.Arbitrary()
+  def shift(self, axis, freq, **kwargs):
+    if axis in (1, 'columns'):
+      preserves = partitionings.Arbitrary()
+      proxy = None
     else:
-      requires_partition_by = partitionings.Singleton()
+      if freq is None or 'fill_value' in kwargs:
+        fill_value = kwargs.get('fill_value', 'NOT SET')
+        raise frame_base.WontImplementError(
+            f"shift(axis={axis!r}) is only supported with freq defined, and "
+            f"fill_value undefined (got freq={freq!r},"
+            f"fill_value={fill_value!r}). Other configurations are sensitive "
+            "to the order of the data because they require populating shifted "
+            "rows with `fill_value`.",
+            reason="order-sensitive")
+      # proxy generation fails in pandas <1.2
+      # Seems due to https://github.com/pandas-dev/pandas/issues/14811,
+      # bug with shift on empty indexes.
+      # Fortunately the proxy should be identical to the input.
+      proxy = self._expr.proxy().copy()
+
+      # index is modified, so no partitioning is preserved.
+      preserves = partitionings.Singleton()
+
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'shift',
-            lambda df: df.shift(axis=axis, **kwargs),
+            lambda df: df.shift(axis=axis, freq=freq, **kwargs),
             [self._expr],
-            preserves_partition_by=partitionings.Singleton(),
-            requires_partition_by=requires_partition_by))
+            proxy=proxy,
+            preserves_partition_by=preserves,
+            requires_partition_by=partitionings.Arbitrary()))
 
   shape = property(frame_base.wont_implement_method(
       pd.DataFrame, 'shape', reason="non-deferred-result"))
@@ -2388,7 +2476,10 @@
             df.groupby(level=list(range(df.index.nlevels)), **groupby_kwargs),
             **kwargs),
         [pre_agg],
-        requires_partition_by=(partitionings.Singleton()
+        requires_partition_by=(partitionings.Singleton(reason=(
+            "Aggregations grouped by a categorical column are not currently "
+            "parallelizable (BEAM-11190)."
+        ))
                                if is_categorical_grouping
                                else partitionings.Index()),
         preserves_partition_by=partitionings.Arbitrary())
@@ -2416,7 +2507,10 @@
                        **groupby_kwargs),
             ), **kwargs),
         [self._ungrouped],
-        requires_partition_by=(partitionings.Singleton()
+        requires_partition_by=(partitionings.Singleton(reason=(
+            "Aggregations grouped by a categorical column are not currently "
+            "parallelizable (BEAM-11190)."
+        ))
                                if is_categorical_grouping
                                else partitionings.Index()),
         preserves_partition_by=partitionings.Arbitrary())
@@ -2633,7 +2727,10 @@
   def cat(self, others, join, **kwargs):
     if others is None:
       # Concatenate series into a single String
-      requires = partitionings.Singleton()
+      requires = partitionings.Singleton(reason=(
+          "cat(others=None) concatenates all data in a Series into a single "
+          "string, so it requires collecting all data on a single node."
+      ))
       func = lambda df: df.str.cat(join=join, **kwargs)
       args = [self._expr]
 
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index b692f08..1cf1dfb 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -45,12 +45,15 @@
 
 
 class DeferredFrameTest(unittest.TestCase):
-  def _run_error_test(self, func, *args):
+  def _run_error_test(
+      self, func, *args, construction_time=True, distributed=True):
     """Verify that func(*args) raises the same exception in pandas and in Beam.
 
-    Note that for Beam this only checks for exceptions that are raised during
-    expression generation (i.e. construction time). Execution time exceptions
-    are not helpful."""
+    Note that by default this only checks for exceptions that the Beam DataFrame
+    API raises during expression generation (i.e. construction time).
+    Exceptions raised while the pipeline is executing are less helpful, but
+    are sometimes unavoidable (e.g. data validation exceptions), to check for
+    these exceptions use construction_time=False."""
     deferred_args = _get_deferred_args(*args)
 
     # Get expected error
@@ -64,14 +67,29 @@
           f"returned:\n{expected}")
 
     # Get actual error
-    try:
-      _ = func(*deferred_args)._expr
-    except Exception as e:
-      actual = e
-    else:
-      raise AssertionError(
-          "Expected an error:\n{expected_error}\nbut Beam successfully "
-          "generated an expression.")
+    if construction_time:
+      try:
+        _ = func(*deferred_args)._expr
+      except Exception as e:
+        actual = e
+      else:
+        raise AssertionError(
+            f"Expected an error:\n{expected_error}\nbut Beam successfully "
+            f"generated an expression.")
+    else:  # not construction_time
+      # Check for an error raised during pipeline execution
+      expr = func(*deferred_args)._expr
+      session_type = (
+          expressions.PartitioningSession
+          if distributed else expressions.Session)
+      try:
+        result = session_type({}).evaluate(expr)
+      except Exception as e:
+        actual = e
+      else:
+        raise AssertionError(
+            f"Expected an error:\n{expected_error}\nbut Beam successfully "
+            f"Computed the result:\n{result}.")
 
     # Verify
     if (not isinstance(actual, type(expected_error)) or
@@ -99,8 +117,15 @@
     deferred_args = _get_deferred_args(*args)
     if nonparallel:
       # First run outside a nonparallel block to confirm this raises as expected
-      with self.assertRaises(expressions.NonParallelOperation):
-        _ = func(*deferred_args)
+      with self.assertRaises(expressions.NonParallelOperation) as raised:
+        func(*deferred_args)
+
+      if raised.exception.msg.startswith(
+          "Encountered non-parallelizable form of"):
+        raise AssertionError(
+            "Default NonParallelOperation raised, please specify a reason in "
+            "the Singleton() partitioning requirement for this operation."
+        ) from raised.exception
 
       # Re-run in an allow non parallel block to get an expression to verify
       with beam.dataframe.allow_non_parallel_operations():
@@ -722,13 +747,14 @@
             lambda x: (x.foo + x.bar).median()),
         df)
 
-  def test_quantile_axis_columns(self):
+  def test_quantile(self):
     df = pd.DataFrame(
         np.array([[1, 1], [2, 10], [3, 100], [4, 100]]), columns=['a', 'b'])
 
-    with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(lambda df: df.quantile(0.1, axis='columns'), df)
+    self._run_test(lambda df: df.quantile(0.1), df, nonparallel=True)
+    self._run_test(lambda df: df.quantile([0.1, 0.9]), df, nonparallel=True)
 
+    self._run_test(lambda df: df.quantile(0.1, axis='columns'), df)
     with self.assertRaisesRegex(frame_base.WontImplementError,
                                 r"df\.quantile\(q=0\.1, axis='columns'\)"):
       self._run_test(lambda df: df.quantile([0.1, 0.5], axis='columns'), df)
@@ -742,6 +768,7 @@
         lambda df: df.groupby('foo', dropna=False).bar.count(), GROUPBY_DF)
 
   def test_dataframe_melt(self):
+
     df = pd.DataFrame({
         'A': {
             0: 'a', 1: 'b', 2: 'c'
@@ -784,6 +811,40 @@
             id_vars=[('A', 'D')], value_vars=[('B', 'E')], ignore_index=False),
         df)
 
+  def test_fillna_columns(self):
+    df = pd.DataFrame(
+        [[np.nan, 2, np.nan, 0], [3, 4, np.nan, 1], [np.nan, np.nan, np.nan, 5],
+         [np.nan, 3, np.nan, 4], [3, np.nan, np.nan, 4]],
+        columns=list('ABCD'))
+
+    self._run_test(lambda df: df.fillna(method='ffill', axis='columns'), df)
+    self._run_test(
+        lambda df: df.fillna(method='ffill', axis='columns', limit=1), df)
+    self._run_test(
+        lambda df: df.fillna(method='bfill', axis='columns', limit=1), df)
+
+    # Intended behavior is unclear here. See
+    # https://github.com/pandas-dev/pandas/issues/40989
+    # self._run_test(lambda df: df.fillna(axis='columns', value=100,
+    #                                     limit=2), df)
+
+  def test_append_verify_integrity(self):
+    df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
+    df2 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(9, 19))
+
+    self._run_error_test(
+        lambda s1,
+        s2: s1.append(s2, verify_integrity=True),
+        df1['A'],
+        df2['A'],
+        construction_time=False)
+    self._run_error_test(
+        lambda df1,
+        df2: df1.append(df2, verify_integrity=True),
+        df1,
+        df2,
+        construction_time=False)
+
 
 class AllowNonParallelTest(unittest.TestCase):
   def _use_non_parallel_operation(self):
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index f720112..fcf18fa 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -40,7 +40,10 @@
             'pandas.core.generic.NDFrame.first': ['*'],
             'pandas.core.generic.NDFrame.head': ['*'],
             'pandas.core.generic.NDFrame.last': ['*'],
-            'pandas.core.generic.NDFrame.shift': ['*'],
+            'pandas.core.generic.NDFrame.shift': [
+                'df.shift(periods=3)',
+                'df.shift(periods=3, fill_value=0)',
+            ],
             'pandas.core.generic.NDFrame.tail': ['*'],
             'pandas.core.generic.NDFrame.take': ['*'],
             'pandas.core.generic.NDFrame.values': ['*'],
@@ -189,8 +192,8 @@
             'pandas.core.frame.DataFrame.transpose': ['*'],
             'pandas.core.frame.DataFrame.shape': ['*'],
             'pandas.core.frame.DataFrame.shift': [
-                'df.shift(periods=3, freq="D")',
-                'df.shift(periods=3, freq="infer")'
+                'df.shift(periods=3)',
+                'df.shift(periods=3, fill_value=0)',
             ],
             'pandas.core.frame.DataFrame.unstack': ['*'],
             'pandas.core.frame.DataFrame.memory_usage': ['*'],
@@ -395,7 +398,10 @@
             ],
             'pandas.core.series.Series.pop': ['*'],
             'pandas.core.series.Series.searchsorted': ['*'],
-            'pandas.core.series.Series.shift': ['*'],
+            'pandas.core.series.Series.shift': [
+                'df.shift(periods=3)',
+                'df.shift(periods=3, fill_value=0)',
+            ],
             'pandas.core.series.Series.take': ['*'],
             'pandas.core.series.Series.to_dict': ['*'],
             'pandas.core.series.Series.unique': ['*'],
diff --git a/sdks/python/apache_beam/dataframe/partitionings.py b/sdks/python/apache_beam/dataframe/partitionings.py
index ef58023..afb71ba 100644
--- a/sdks/python/apache_beam/dataframe/partitionings.py
+++ b/sdks/python/apache_beam/dataframe/partitionings.py
@@ -151,6 +151,13 @@
 class Singleton(Partitioning):
   """A partitioning of all the data into a single partition.
   """
+  def __init__(self, reason=None):
+    self._reason = reason
+
+  @property
+  def reason(self):
+    return self._reason
+
   def __eq__(self, other):
     return type(self) == type(other)
 
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai.py
deleted file mode 100644
index 6e50cd2..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai.py
+++ /dev/null
@@ -1,585 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A connector for sending API requests to the GCP Recommendations AI
-API (https://cloud.google.com/recommendations).
-"""
-
-from __future__ import absolute_import
-
-from typing import Sequence
-from typing import Tuple
-
-from google.api_core.retry import Retry
-
-from apache_beam import pvalue
-from apache_beam.metrics import Metrics
-from apache_beam.options.pipeline_options import GoogleCloudOptions
-from apache_beam.transforms import DoFn
-from apache_beam.transforms import ParDo
-from apache_beam.transforms import PTransform
-from apache_beam.transforms.util import GroupIntoBatches
-from cachetools.func import ttl_cache
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-except ImportError:
-  raise ImportError(
-      'Google Cloud Recommendation AI not supported for this execution '
-      'environment (could not import google.cloud.recommendationengine).')
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-__all__ = [
-    'CreateCatalogItem',
-    'WriteUserEvent',
-    'ImportCatalogItems',
-    'ImportUserEvents',
-    'PredictUserEvent'
-]
-
-FAILED_CATALOG_ITEMS = "failed_catalog_items"
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_prediction_client():
-  """Returns a Recommendation AI - Prediction Service client."""
-  _client = recommendationengine.PredictionServiceClient()
-  return _client
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_catalog_client():
-  """Returns a Recommendation AI - Catalog Service client."""
-  _client = recommendationengine.CatalogServiceClient()
-  return _client
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_user_event_client():
-  """Returns a Recommendation AI - UserEvent Service client."""
-  _client = recommendationengine.UserEventServiceClient()
-  return _client
-
-
-class CreateCatalogItem(PTransform):
-  """Creates catalogitem information.
-    The ``PTranform`` returns a PCollectionTuple with a PCollections of
-    successfully and failed created CatalogItems.
-
-    Example usage::
-
-      pipeline | CreateCatalogItem(
-        project='example-gcp-project',
-        catalog_name='my-catalog')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog"):
-    """Initializes a :class:`CreateCatalogItem` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          """GCP project name needs to be specified in "project" pipeline
-            option""")
-    return pcoll | ParDo(
-        _CreateCatalogItemFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name))
-
-
-class _CreateCatalogItemFn(DoFn):
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/{catalog_name}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_catalog_client()
-
-  def process(self, element):
-    catalog_item = recommendationengine.CatalogItem(element)
-    request = recommendationengine.CreateCatalogItemRequest(
-        parent=self.parent, catalog_item=catalog_item)
-
-    try:
-      created_catalog_item = self._client.create_catalog_item(
-          request=request,
-          retry=self.retry,
-          timeout=self.timeout,
-          metadata=self.metadata)
-
-      self.counter.inc()
-      yield recommendationengine.CatalogItem.to_dict(created_catalog_item)
-    except Exception:
-      yield pvalue.TaggedOutput(
-          FAILED_CATALOG_ITEMS,
-          recommendationengine.CatalogItem.to_dict(catalog_item))
-
-
-class ImportCatalogItems(PTransform):
-  """Imports catalogitems in bulk.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed imported CatalogItems.
-
-    Example usage::
-
-      pipeline
-      | ImportCatalogItems(
-          project='example-gcp-project',
-          catalog_name='my-catalog')
-    """
-  def __init__(
-      self,
-      max_batch_size: int = 5000,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog"):
-    """Initializes a :class:`ImportCatalogItems` transform
-
-        Args:
-            batch_size (int): Required. Maximum number of catalogitems per
-              request.
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-        """
-    self.max_batch_size = max_batch_size
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return (
-        pcoll | GroupIntoBatches.WithShardedKey(self.max_batch_size) | ParDo(
-            _ImportCatalogItemsFn(
-                self.project,
-                self.retry,
-                self.timeout,
-                self.metadata,
-                self.catalog_name)))
-
-
-class _ImportCatalogItemsFn(DoFn):
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/{catalog_name}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self.client = get_recommendation_catalog_client()
-
-  def process(self, element):
-    catalog_items = [recommendationengine.CatalogItem(e) for e in element[1]]
-    catalog_inline_source = recommendationengine.CatalogInlineSource(
-        {"catalog_items": catalog_items})
-    input_config = recommendationengine.InputConfig(
-        catalog_inline_source=catalog_inline_source)
-
-    request = recommendationengine.ImportCatalogItemsRequest(
-        parent=self.parent, input_config=input_config)
-
-    try:
-      operation = self._client.import_catalog_items(
-          request=request,
-          retry=self.retry,
-          timeout=self.timeout,
-          metadata=self.metadata)
-      self.counter.inc(len(catalog_items))
-      yield operation.result()
-    except Exception:
-      yield pvalue.TaggedOutput(FAILED_CATALOG_ITEMS, catalog_items)
-
-
-class WriteUserEvent(PTransform):
-  """Write user event information.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed written UserEvents.
-
-    Example usage::
-
-      pipeline
-      | WriteUserEvent(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store"):
-    """Initializes a :class:`WriteUserEvent` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return pcoll | ParDo(
-        _WriteUserEventFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name,
-            self.event_store))
-
-
-class _WriteUserEventFn(DoFn):
-  FAILED_USER_EVENTS = "failed_user_events"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/"\
-                  f"{catalog_name}/eventStores/{event_store}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_user_event_client()
-
-  def process(self, element):
-    user_event = recommendationengine.UserEvent(element)
-    request = recommendationengine.WriteUserEventRequest(
-        parent=self.parent, user_event=user_event)
-
-    try:
-      created_user_event = self._client.write_user_event(request)
-      self.counter.inc()
-      yield recommendationengine.UserEvent.to_dict(created_user_event)
-    except Exception:
-      yield pvalue.TaggedOutput(
-          self.FAILED_USER_EVENTS,
-          recommendationengine.UserEvent.to_dict(user_event))
-
-
-class ImportUserEvents(PTransform):
-  """Imports userevents in bulk.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed imported UserEvents.
-
-    Example usage::
-
-      pipeline
-      | ImportUserEvents(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store')
-    """
-  def __init__(
-      self,
-      max_batch_size: int = 5000,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store"):
-    """Initializes a :class:`WriteUserEvent` transform.
-
-        Args:
-            batch_size (int): Required. Maximum number of catalogitems
-              per request.
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-        """
-    self.max_batch_size = max_batch_size
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return (
-        pcoll | GroupIntoBatches.WithShardedKey(self.max_batch_size) | ParDo(
-            _ImportUserEventsFn(
-                self.project,
-                self.retry,
-                self.timeout,
-                self.metadata,
-                self.catalog_name,
-                self.event_store)))
-
-
-class _ImportUserEventsFn(DoFn):
-  FAILED_USER_EVENTS = "failed_user_events"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/"\
-                  f"{catalog_name}/eventStores/{event_store}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self.client = get_recommendation_user_event_client()
-
-  def process(self, element):
-
-    user_events = [recommendationengine.UserEvent(e) for e in element[1]]
-    user_event_inline_source = recommendationengine.UserEventInlineSource(
-        {"user_events": user_events})
-    input_config = recommendationengine.InputConfig(
-        user_event_inline_source=user_event_inline_source)
-
-    request = recommendationengine.ImportUserEventsRequest(
-        parent=self.parent, input_config=input_config)
-
-    try:
-      operation = self._client.write_user_event(request)
-      self.counter.inc(len(user_events))
-      yield recommendationengine.PredictResponse.to_dict(operation.result())
-    except Exception:
-      yield pvalue.TaggedOutput(self.FAILED_USER_EVENTS, user_events)
-
-
-class PredictUserEvent(PTransform):
-  """Make a recommendation prediction.
-    The `PTransform` returns a PCollection
-
-    Example usage::
-
-      pipeline
-      | PredictUserEvent(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store',
-          placement_id='recently_viewed_default')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store",
-      placement_id: str = None):
-    """Initializes a :class:`PredictUserEvent` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-            placement_id (str): Required. ID of the recommendation engine
-              placement. This id is used to identify the set of models that
-              will be used to make the prediction.
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.placement_id = placement_id
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-    if placement_id is None:
-      raise ValueError('placement_id must be specified')
-    else:
-      self.placement_id = placement_id
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return pcoll | ParDo(
-        _PredictUserEventFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name,
-            self.event_store,
-            self.placement_id))
-
-
-class _PredictUserEventFn(DoFn):
-  FAILED_PREDICTIONS = "failed_predictions"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None,
-      placement_id=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.name = f"projects/{project}/locations/global/catalogs/"\
-                f"{catalog_name}/eventStores/{event_store}/placements/"\
-                f"{placement_id}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_prediction_client()
-
-  def process(self, element):
-    user_event = recommendationengine.UserEvent(element)
-    request = recommendationengine.PredictRequest(
-        name=self.name, user_event=user_event)
-
-    try:
-      prediction = self._client.predict(request)
-      self.counter.inc()
-      yield [
-          recommendationengine.PredictResponse.to_dict(p)
-          for p in prediction.pages
-      ]
-    except Exception:
-      yield pvalue.TaggedOutput(self.FAILED_PREDICTIONS, user_event)
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py
deleted file mode 100644
index 2f688d9..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py
+++ /dev/null
@@ -1,207 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for Recommendations AI transforms."""
-
-from __future__ import absolute_import
-
-import unittest
-
-import mock
-
-import apache_beam as beam
-from apache_beam.metrics import MetricsFilter
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-  from apache_beam.ml.gcp import recommendations_ai
-except ImportError:
-  recommendationengine = None
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAICatalogItemTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.create_catalog_item.return_value = (
-        recommendationengine.CatalogItem())
-    self.m2 = mock.Mock()
-    self.m2.result.return_value = None
-    self._mock_client.import_catalog_items.return_value = self.m2
-
-    self._catalog_item = {
-        "id": "12345",
-        "title": "Sample laptop",
-        "description": "Indisputably the most fantastic laptop ever created.",
-        "language_code": "en",
-        "category_hierarchies": [{
-            "categories": ["Electronic", "Computers"]
-        }]
-    }
-
-  def test_CreateCatalogItem(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_catalog_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._catalog_item])
-          | "Create CatalogItem" >>
-          recommendations_ai.CreateCatalogItem(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-  def test_ImportCatalogItems(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_catalog_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([
-              (self._catalog_item["id"], self._catalog_item),
-              (self._catalog_item["id"], self._catalog_item)
-          ]) | "Create CatalogItems" >>
-          recommendations_ai.ImportCatalogItems(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAIUserEventTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.write_user_event.return_value = (
-        recommendationengine.UserEvent())
-    self.m2 = mock.Mock()
-    self.m2.result.return_value = None
-    self._mock_client.import_user_events.return_value = self.m2
-
-    self._user_event = {
-        "event_type": "page-visit", "user_info": {
-            "visitor_id": "1"
-        }
-    }
-
-  def test_CreateUserEvent(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_user_event_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._user_event])
-          | "Create UserEvent" >>
-          recommendations_ai.WriteUserEvent(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-  def test_ImportUserEvents(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_user_event_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([
-              (self._user_event["user_info"]["visitor_id"], self._user_event),
-              (self._user_event["user_info"]["visitor_id"], self._user_event)
-          ]) | "Create UserEvents" >>
-          recommendations_ai.ImportUserEvents(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAIPredictTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.predict.return_value = [
-        recommendationengine.PredictResponse()
-    ]
-
-    self._user_event = {
-        "event_type": "page-visit", "user_info": {
-            "visitor_id": "1"
-        }
-    }
-
-  def test_Predict(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_prediction_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._user_event])
-          | "Prediction UserEvents" >> recommendations_ai.PredictUserEvent(
-              project="test", placement_id="recently_viewed_default"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
deleted file mode 100644
index 19e6b9e..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Integration tests for Recommendations AI transforms."""
-
-from __future__ import absolute_import
-
-import random
-import unittest
-
-from nose.plugins.attrib import attr
-
-import apache_beam as beam
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-from apache_beam.testing.util import is_not_empty
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-  from apache_beam.ml.gcp import recommendations_ai
-except ImportError:
-  recommendationengine = None
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-
-def extract_id(response):
-  yield response["id"]
-
-
-def extract_event_type(response):
-  yield response["event_type"]
-
-
-def extract_prediction(response):
-  yield response[0]["results"]
-
-
-@attr('IT')
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationAIIT(unittest.TestCase):
-  def test_create_catalog_item(self):
-
-    CATALOG_ITEM = {
-        "id": str(int(random.randrange(100000))),
-        "title": "Sample laptop",
-        "description": "Indisputably the most fantastic laptop ever created.",
-        "language_code": "en",
-        "category_hierarchies": [{
-            "categories": ["Electronic", "Computers"]
-        }]
-    }
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([CATALOG_ITEM])
-          | 'Create CatalogItem' >>
-          recommendations_ai.CreateCatalogItem(project=p.get_option('project'))
-          | beam.ParDo(extract_id) | beam.combiners.ToList())
-
-      assert_that(output, equal_to([[CATALOG_ITEM["id"]]]))
-
-  def test_create_user_event(self):
-    USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}}
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([USER_EVENT]) | 'Create UserEvent' >>
-          recommendations_ai.WriteUserEvent(project=p.get_option('project'))
-          | beam.ParDo(extract_event_type) | beam.combiners.ToList())
-
-      assert_that(output, equal_to([[USER_EVENT["event_type"]]]))
-
-  def test_create_predict(self):
-    USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}}
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([USER_EVENT])
-          | 'Predict UserEvent' >> recommendations_ai.PredictUserEvent(
-              project=p.get_option('project'),
-              placement_id="recently_viewed_default")
-          | beam.ParDo(extract_prediction))
-
-      assert_that(output, is_not_empty())
-
-
-if __name__ == '__main__':
-  print(recommendationengine.CatalogItem.__module__)
-  unittest.main()
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 7117a55..775f321 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -200,7 +200,6 @@
     'google-cloud-language>=1.3.0,<2',
     'google-cloud-videointelligence>=1.8.0,<2',
     'google-cloud-vision>=0.38.0,<2',
-    'google-cloud-recommendations-ai>=0.1.0,<=0.2.0'
 ]
 
 INTERACTIVE_BEAM = [
diff --git a/website/www/site/assets/scss/_global.sass b/website/www/site/assets/scss/_global.sass
index 773b404..f7f7b5a 100644
--- a/website/www/site/assets/scss/_global.sass
+++ b/website/www/site/assets/scss/_global.sass
@@ -115,7 +115,7 @@
   line-height: 1.63
   letter-spacing: 0.43px
   padding: 24px
-  max-width: 848px
+  max-width: 100%
   position: relative
 
   pre
@@ -133,7 +133,7 @@
         cursor: pointer
 
 .snippet
-  max-width: 848px
+  max-width: 100%
   margin-bottom: 40px
   .git-link
     float: right
diff --git a/website/www/site/content/en/documentation/_index.md b/website/www/site/content/en/documentation/_index.md
index 36812d5..80d111b 100644
--- a/website/www/site/content/en/documentation/_index.md
+++ b/website/www/site/content/en/documentation/_index.md
@@ -30,6 +30,7 @@
 * Read the [Programming Guide](/documentation/programming-guide/), which introduces all the key Beam concepts.
 * Learn about Beam's [execution model](/documentation/runtime/model) to better understand how pipelines execute.
 * Visit [Learning Resources](/documentation/resources/learning-resources) for some of our favorite articles and talks about Beam.
+* Visit the [glossary](/documentation/glossary) to learn the terminology of the Beam programming model.
 
 ## Pipeline Fundamentals
 
diff --git a/website/www/site/content/en/documentation/glossary.md b/website/www/site/content/en/documentation/glossary.md
new file mode 100644
index 0000000..3993f53
--- /dev/null
+++ b/website/www/site/content/en/documentation/glossary.md
@@ -0,0 +1,464 @@
+---
+title: "Beam glossary"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Apache Beam glossary
+
+## Aggregation
+
+A transform pattern for computing a value from multiple input elements. Aggregation is similar to the reduce operation in the [MapReduce](https://en.wikipedia.org/wiki/MapReduce) model. Aggregation transforms include Count (computes the count of all elements in the aggregation), Max (computes the maximum element in the aggregation), and Sum (computes the sum of all elements in the aggregation).
+
+For a complete list of aggregation transforms, see:
+
+* [Java Transform catalog](/documentation/transforms/java/overview/#aggregation)
+* [Python Transform catalog](/documentation/transforms/python/overview/#aggregation)
+
+## Apply
+
+A method for invoking a transform on a PCollection. Each transform in the Beam SDKs has a generic `apply` method (or pipe operator `|`). Invoking multiple Beam transforms is similar to method chaining, but with a difference: You apply the transform to the input PCollection, passing the transform itself as an argument, and the operation returns the output PCollection. Because of Beam’s deferred execution model, applying a transform does not immediately execute that transform.
+
+To learn more, see:
+
+* [Applying transforms](/documentation/programming-guide/#applying-transforms)
+
+## Batch processing
+
+A data processing paradigm for working with finite, or bounded, datasets. A bounded PCollection represents a dataset of a known, fixed size. Reading from a batch data source, such as a file or a database, creates a bounded PCollection. A batch processing job eventually ends, in contrast to a streaming job, which runs until cancelled.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+
+## Bounded data
+
+A dataset of a known, fixed size. A PCollection can be bounded or unbounded, depending on the source of the data that it represents. Reading from a batch data source, such as a file or a database, creates a bounded PCollection. Beam also supports reading a bounded amount of data from an unbounded source.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+
+## Bundle
+
+The processing unit for elements in a PCollection. Instead of processing all elements in a PCollection simultaneously, Beam processes the elements in bundles. The runner handles the division of the collection into bundles, and in doing so it may optimize the bundle size for the use case. For example, a streaming runner might process smaller bundles than a batch runner.
+
+To learn more, see:
+
+* [Bundling and persistence](/documentation/runtime/model/#bundling-and-persistence)
+
+## Coder
+
+A component that describes how the elements of a PCollection can be encoded and decoded. To support distributed processing and cross-language portability, Beam needs to be able to encode each element of a PCollection as bytes. The Beam SDKs provide built-in coders for common types and language-specific mechanisms for specifying the encoding of a PCollection.
+
+To learn more, see:
+
+* [Data encoding and type safety](/documentation/programming-guide/#data-encoding-and-type-safety)
+
+## CoGroupByKey
+
+A PTransform that takes two or more PCollections and aggregates the elements by key. In effect, CoGroupByKey performs a relational join of two or more key/value PCollections that have the same key type. While GroupByKey performs this operation over a single input collection, CoGroupByKey operates over multiple input collections.
+
+To learn more, see:
+
+* [CoGroupByKey](/documentation/programming-guide/#cogroupbykey)
+* [CoGroupByKey (Java)](/documentation/transforms/java/aggregation/cogroupbykey/)
+* [CoGroupByKey (Python)](/documentation/transforms/python/aggregation/cogroupbykey/)
+
+## Collection
+
+See [PCollection](/documentation/glossary/#pcollection).
+
+## Combine
+
+A PTransform for combining all elements of a PCollection or all values associated with a key. When you apply a Combine transform, you have to provide a user-defined function (UDF) that contains the logic for combining the elements or values. The combining function should be [commutative](https://en.wikipedia.org/wiki/Commutative_property) and [associative](https://en.wikipedia.org/wiki/Associative_property), because the function is not necessarily invoked exactly once on all values with a given key.
+
+To learn more, see:
+
+* [Combine](/documentation/programming-guide/#combine)
+* [Combine (Java)](/documentation/transforms/java/aggregation/combine/)
+* [CombineGlobally (Python)](/documentation/transforms/python/aggregation/combineglobally/)
+* [CombinePerKey (Python)](/documentation/transforms/python/aggregation/combineperkey/)
+* [CombineValues (Python)](/documentation/transforms/python/aggregation/combinevalues/)
+
+## Composite transform
+
+A PTransform that expands into many PTransforms. Composite transforms have a nested structure, in which a complex transform applies one or more simpler transforms. These simpler transforms could be existing Beam operations like ParDo, Combine, or GroupByKey, or they could be other composite transforms. Nesting multiple transforms inside a single composite transform can make your pipeline more modular and easier to understand.
+
+To learn more, see:
+
+* [Composite transforms](/documentation/programming-guide/#composite-transforms)
+
+## Counter (metric)
+
+A metric that reports a single long value and can be incremented. In the Beam model, metrics provide insight into the state of a pipeline, potentially while the pipeline is running.
+
+To learn more, see:
+
+* [Types of metrics](/documentation/programming-guide/#types-of-metrics)
+
+## Cross-language transforms
+
+Transforms that can be shared across Beam SDKs. With cross-language transforms, you can use transforms written in any supported SDK language (currently, Java and Python) in a pipeline written in a different SDK language. For example, you could use the Apache Kafka connector from the Java SDK in a Python streaming pipeline. Cross-language transforms make it possible to provide new functionality simultaneously in different SDKs.
+
+To learn more, see:
+
+* [Multi-language pipelines](/documentation/programming-guide/#mulit-language-pipelines)
+
+## Deferred execution
+
+A feature of the Beam execution model. Beam operations are deferred, meaning that the result of a given operation may not be available for control flow. Deferred execution allows the Beam API to support parallel processing of data.
+
+## Distribution (metric)
+
+A metric that reports information about the distribution of reported values. In the Beam model, metrics provide insight into the state of a pipeline, potentially while the pipeline is running.
+
+To learn more, see:
+
+* [Types of metrics](/documentation/programming-guide/#types-of-metrics)
+
+## DoFn
+
+A function object used by ParDo (or some other transform) to process the elements of a PCollection. A DoFn is a user-defined function, meaning that it contains custom code that defines a data processing task in your pipeline. The Beam system invokes a DoFn one or more times to process some arbitrary bundle of elements, but Beam doesn’t guarantee an exact number of invocations.
+
+To learn more, see:
+
+* [ParDo](/documentation/programming-guide/#pardo)
+
+## Driver
+
+A program that defines your pipeline, including all of the inputs, transforms, and outputs. To use Beam, you need to create a driver program using classes from one of the Beam SDKs. The driver program creates a pipeline and specifies the execution options that tell the pipeline where and how to run. These options include the runner, which determines what backend your pipeline will run on.
+
+To learn more, see:
+
+* [Overview](/documentation/programming-guide/#overview)
+
+## Element
+
+The unit of data in a PCollection. Elements in a PCollection can be of any type, but they must all have the same type. This allows parallel computations to operate uniformly across the entire collection. Some element types have a structure that can be introspected (for example, JSON, Protocol Buffer, Avro, and database records).
+
+To learn more, see:
+
+* [PCollection characteristics](/documentation/programming-guide/#pcollection-characteristics)
+
+## Element-wise
+
+A type of transform that independently processes each element in an input PCollection. Element-wise is similar to the map operation in the [MapReduce](https://en.wikipedia.org/wiki/MapReduce) model. An element-wise transform might output 0, 1, or multiple values for each input element. This is in contrast to aggregation transforms, which compute a single value from multiple input elements. Element-wise operations include Filter, FlatMap, and ParDo.
+
+For a complete list of element-wise transforms, see:
+
+* [Java Transform catalog](/documentation/transforms/java/overview/#element-wise)
+* [Python Transform catalog](/documentation/transforms/python/overview/#element-wise)
+
+## Engine
+
+A data-processing system, such as Dataflow, Spark, or Flink. A Beam runner for an engine executes a Beam pipeline on that engine.
+
+## Event time
+
+The time a data event occurs, determined by a timestamp on an element. This is in contrast to processing time, which is when an element is processed in a pipeline. An event could be, for example, a user interaction or a write to an error log. There’s no guarantee that events will appear in a pipeline in order of event time.
+
+To learn more, see:
+
+* [Watermarks and late data](/documentation/programming-guide/#watermarks-and-late-data)
+* [Triggers](/documentation/programming-guide/#triggers)
+
+## Expansion Service
+
+A service that enables a pipeline to apply (expand) cross-language transforms defined in other SDKs. For example, by connecting to a Java expansion service, the Python SDK can apply transforms implemented in Java. Currently SDKs define expansion services as local processes, but in the future Beam may support long-running expansion services. The development of expansion services is part of the ongoing effort to support multi-language pipelines.
+
+## Flatten
+One of the core PTransforms. Flatten merges multiple PCollections into a single logical PCollection.
+
+To learn more, see:
+
+* [Flatten](/documentation/programming-guide/#flatten)
+* [Flatten (Java)](/documentation/transforms/java/other/flatten/)
+* [Flatten (Python)](/documentation/transforms/python/other/flatten/)
+
+## Fusion
+
+An optimization that Beam runners can apply before running a pipeline. When one transform outputs a PCollection that’s consumed by another transform, or when two or more transforms take the same PCollection as input, a runner may be able to fuse the transforms together into a single processing unit (a *stage* in Dataflow). Fusion can make pipeline execution more efficient by preventing I/O operations.
+
+## Gauge (metric)
+
+A metric that reports the latest value out of reported values. In the Beam model, metrics provide insight into the state of a pipeline, potentially while the pipeline is running. Because metrics are collected from many workers, the gauge value may not be the absolute last value, but it will be one of the latest values produced by one of the workers.
+
+To learn more, see:
+
+* [Types of metrics](/documentation/programming-guide/#types-of-metrics)
+
+## GroupByKey
+
+A PTransform for processing collections of key/value pairs. GroupByKey is a parallel reduction operation, similar to the shuffle of a map/shuffle/reduce algorithm. The input to GroupByKey is a collection of key/value pairs in which multiple pairs have the same key but different values (i.e. a multimap). You can use GroupByKey to collect all of the values associated with each unique key.
+
+To learn more, see:
+
+* [GroupByKey](/documentation/programming-guide/#groupbykey)
+* [GroupByKey (Java)](/documentation/transforms/java/aggregation/groupbykey/)
+* [GroupByKey (Python)](/documentation/transforms/python/aggregation/groupbykey/)
+
+## I/O connector
+
+A set of PTransforms for working with external data storage systems. When you create a pipeline, you often need to read from or write to external data systems such as files or databases. Beam provides read and write transforms for a number of common data storage types.
+
+To learn more, see:
+
+* [Pipeline I/O](/documentation/programming-guide/#pipeline-io)
+* [Built-in I/O Transforms](/documentation/io/built-in/)
+
+## Map
+
+An element-wise PTransform that applies a user-defined function (UDF) to each element in a PCollection. Using Map, you can transform each individual element, but you can't change the number of elements.
+
+To learn more, see:
+
+* [Map (Python)](/documentation/transforms/python/elementwise/map/)
+* [MapElements (Java)](/documentation/transforms/java/elementwise/mapelements/)
+
+## Metrics
+
+Data on the state of a pipeline, potentially while the pipeline is running. You can use the built-in Beam metrics to gain insight into the functioning of your pipeline. For example, you might use Beam metrics to track errors, calls to a backend service, or the number of elements processed. Beam currently supports three types of metric: Counter, Distribution, and Gauge.
+
+To learn more, see:
+
+* [Metrics](/documentation/programming-guide/#metrics)
+
+## Multi-language pipeline
+
+A pipeline that uses cross-language transforms. You can combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline.
+
+To learn more, see:
+
+* [Multi-language pipelines](/documentation/programming-guide/#mulit-language-pipelines)
+
+## ParDo
+
+The lowest-level element-wise PTransform. For each element in an input PCollection, ParDo applies a function and emits zero, one, or multiple elements to an output PCollection. “ParDo” is short for “Parallel Do.” It’s similar to the map operation in a [MapReduce](https://en.wikipedia.org/wiki/MapReduce) algorithm, the `apply` method from a DataFrame, or the `UPDATE` keyword from SQL.
+
+To learn more, see:
+
+* [ParDo](/documentation/programming-guide/#pardo)
+* [ParDo (Java)](/documentation/transforms/java/elementwise/pardo/)
+* [ParDo (Python)](/documentation/transforms/python/elementwise/pardo/)
+
+## Partition
+
+An element-wise PTransform that splits a single PCollection into a fixed number of smaller PCollections. Partition requires a user-defined function (UDF) to determine how to split up the elements of the input collection into the resulting output collections. The number of partitions must be determined at graph construction time, meaning that you can’t determine the number of partitions using data calculated by the running pipeline.
+
+To learn more, see:
+
+* [Partition](/documentation/programming-guide/#partition)
+* [Partition (Java)](/documentation/transforms/java/elementwise/partition/)
+* [Partition (Python)](/documentation/transforms/python/elementwise/partition/)
+
+## PCollection
+
+A potentially distributed, homogeneous dataset or data stream. PCollections represent data in a Beam pipeline, and Beam transforms (PTransforms) use PCollection objects as inputs and outputs. PCollections are intended to be immutable, meaning that once a PCollection is created, you can’t add, remove, or change individual elements. The “P” stands for “parallel.”
+
+To learn more, see:
+
+* [PCollections](/documentation/programming-guide/#pcollections)
+
+## Pipe operator (`|`)
+
+Delimits a step in a Python pipeline. For example: `[Final Output PCollection] = ([Initial Input PCollection] | [First Transform] | [Second Transform] | [Third Transform])`. The output of each transform is passed from left to right as input to the next transform. The pipe operator in Python is equivalent to the `apply` method in Java (in other words, the pipe applies a transform to a PCollection).
+
+To learn more, see:
+
+* [Applying transforms](/documentation/programming-guide/#applying-transforms)
+
+## Pipeline
+
+An encapsulation of your entire data processing task, including reading input data from a source, transforming that data, and writing output data to a sink. You can think of a pipeline as a Beam program that uses PTransforms to process PCollections. The transforms in a pipeline can be represented as a directed acyclic graph (DAG). All Beam driver programs must create a pipeline.
+
+To learn more, see:
+
+* [Overview](/documentation/programming-guide/#overview)
+* [Creating a pipeline](/documentation/programming-guide/#creating-a-pipeline)
+* [Design your pipeline](/documentation/pipelines/design-your-pipeline/)
+* [Create your pipeline](/documentation/pipelines/create-your-pipeline/)
+
+## Processing time
+
+The time at which an element is processed at some stage in a pipeline. Processing time is not the same as event time, which is the time at which a data event occurs. Processing time is determined by the clock on the system processing the element. There’s no guarantee that elements will be processed in order of event time.
+
+To learn more, see:
+
+* [Watermarks and late data](/documentation/programming-guide/#watermarks-and-late-data)
+* [Triggers](/documentation/programming-guide/#triggers)
+
+## PTransform
+
+A data processing operation, or a step, in your pipeline. A PTransform takes zero or more PCollections as input, applies a processing function to the elements of that PCollection, and produces zero or more output PCollections. Some PTransforms accept user-defined functions that apply custom logic. The “P” stands for “parallel.”
+
+To learn more, see:
+
+* [Overview](/documentation/programming-guide/#overview)
+* [Transforms](/documentation/programming-guide/#transforms)
+
+## Runner
+
+A runner runs a pipeline on a specific platform. Most runners are translators or adapters to massively parallel big data processing systems. Other runners exist for local testing and debugging. Among the supported runners are Google Cloud Dataflow, Apache Spark, Apache Samza, Apache Flink, the Interactive Runner, and the Direct Runner.
+
+To learn more, see:
+
+* [Choosing a Runner](/documentation/#choosing-a-runner)
+* [Beam Capability Matrix](/documentation/runners/capability-matrix/)
+
+## Schema
+
+A language-independent type definition for a PCollection. The schema for a PCollection defines elements of that PCollection as an ordered list of named fields. Each field has a name, a type, and possibly a set of user options. Schemas provide a way to reason about types across different programming-language APIs.
+
+To learn more, see:
+
+* [Schemas](/documentation/programming-guide/#schemas)
+* [Schema Patterns](/documentation/patterns/schema/)
+
+## Session
+
+A time interval for grouping data events. A session is defined by some minimum gap duration between events. For example, a data stream representing user mouse activity may have periods with high concentrations of clicks followed by periods of inactivity. A session can represent such a pattern of activity followed by inactivity.
+
+To learn more, see:
+
+* [Session windows](/documentation/programming-guide/#session-windows)
+* [Analyzing Usage Patterns](/get-started/mobile-gaming-example/#analyzing-usage-patterns)
+
+## Side input
+
+Additional input to a PTransform. Side input is input that you provide in addition to the main input PCollection. A DoFn can access side input each time it processes an element in the PCollection. Side inputs are useful if your transform needs to inject additional data at runtime.
+
+To learn more, see:
+
+* [Side inputs](/documentation/programming-guide/#side-inputs)
+* [Side input patterns](/documentation/patterns/side-inputs/)
+
+## Sink
+
+A transform that writes to an external data storage system, like a file or database.
+
+To learn more, see:
+
+* [Developing new I/O connectors](/documentation/io/developing-io-overview/)
+* [Pipeline I/O](/documentation/programming-guide/#pipeline-io)
+* [Built-in I/O transforms](/documentation/io/built-in/)
+
+## Source
+A transform that reads from an external storage system. A pipeline typically reads input data from a source. The source has a type, which may be different from the sink type, so you can change the format of data as it moves through your pipeline.
+
+To learn more, see:
+
+* [Developing new I/O connectors](/documentation/io/developing-io-overview/)
+* [Pipeline I/O](/documentation/programming-guide/#pipeline-io)
+* [Built-in I/O transforms](/documentation/io/built-in/)
+
+## Splittable DoFn
+
+A generalization of DoFn that makes it easier to create complex, modular I/O connectors. A Splittable DoFn (SDF) can process elements in a non-monolithic way, meaning that the processing can be decomposed into smaller tasks. With SDF, you can check-point the processing of an element, and you can split the remaining work to yield additional parallelism. SDF is recommended for building new I/O connectors.
+
+To learn more, see:
+
+* [Splittable DoFns](/documentation/programming-guide/#splittable-dofns)
+* [Splittable DoFn in Apache Beam is Ready to Use](/blog/splittable-do-fn-is-available/)
+
+## State
+
+Persistent values that a PTransform can access. The state API lets you augment element-wise operations (for example, ParDo or Map) with mutable state. Using the state API, you can read from, and write to, state as you process each element of a PCollection. You can use the state API together with the timer API to create processing tasks that give you fine-grained control over the workflow.
+
+To learn more, see:
+
+* [State and Timers](/documentation/programming-guide/#state-and-timers)
+* [Stateful processing with Apache Beam](/blog/stateful-processing/)
+
+## Streaming
+
+A data processing paradigm for working with infinite, or unbounded, datasets. Reading from a streaming data source, such as Pub/Sub or Kafka, creates an unbounded PCollection. An unbounded PCollection must be processed using a job that runs continuously, because the entire collection can never be available for processing at any one time.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+* [Python Streaming Pipelines](/documentation/sdks/python-streaming/)
+
+# Timer
+
+A Beam feature that enables delayed processing of data stored using the state API. The timer API lets you set timers to call back at either an event-time or a processing-time timestamp. You can use the timer API together with the state API to create processing tasks that give you fine-grained control over the workflow.
+
+To learn more, see:
+
+* [State and Timers](/documentation/programming-guide/#state-and-timers)
+* [Stateful processing with Apache Beam](/blog/stateful-processing/)
+* [Timely (and Stateful) Processing with Apache Beam](/blog/timely-processing/)
+
+## Timestamp
+
+A point in time associated with an element in a PCollection and used to assign a window to the element. The source that creates the PCollection assigns each element an initial timestamp, often corresponding to when the element was read or added. But you can also manually assign timestamps. This can be useful if elements have an inherent timestamp, but the timestamp is somewhere in the structure of the element itself (for example, a time field in a server log entry).
+
+To learn more, see:
+
+* [Element timestamps](/documentation/programming-guide/#element-timestamps)
+* [Adding timestamps to a PCollection’s elements](/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements)
+
+## Transform
+
+See PTransform.
+
+## Trigger
+
+Determines when to emit aggregated result data from a window. You can use triggers to refine the windowing strategy for your pipeline. If you use the default windowing configuration and default trigger, Beam outputs an aggregated result when it estimates that all data for a window has arrived, and it discards all subsequent data for that window. But you can also use triggers to emit early results, before all the data in a given window has arrived, or to process late data by triggering after the event time watermark passes the end of the window.
+
+To learn more, see:
+
+* [Triggers](/documentation/programming-guide/#triggers)
+
+## Unbounded data
+
+A dataset of unlimited size. A PCollection can be bounded or unbounded, depending on the source of the data that it represents. Reading from a streaming or continuously-updating data source, such as Pub/Sub or Kafka, typically creates an unbounded PCollection.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+
+## User-defined function
+
+Custom logic that a PTransform applies to your data. Some PTransforms accept a user-defined function (UDF) as a way to configure the transform. For example, ParDo expects user code in the form of a DoFn object. Each language SDK has its own idiomatic way of expressing user-defined functions, but there are some common requirements, like serializability and thread compatibility.
+
+To learn more, see:
+
+* [User-Defined Functions (UDFs)](/documentation/basics/#user-defined-functions-udfs)
+* [ParDo](/documentation/programming-guide/#pardo)
+* [Requirements for writing user code for Beam transforms](/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms)
+
+## Watermark
+
+The point in event time when all data in a window can be expected to have arrived in the pipeline. Watermarks provide a way to estimate the completeness of input data. Every PCollection has an associated watermark. Once the watermark progresses past the end of a window, any element that arrives with a timestamp in that window is considered late data.
+
+To learn more, see:
+
+* [Watermarks and late data](/documentation/programming-guide/#watermarks-and-late-data)
+
+## Windowing
+
+Partitioning a PCollection into bounded subsets grouped by the timestamps of individual elements. In the Beam model, any PCollection – including unbounded PCollections – can be subdivided into logical windows. Each element in a PCollection is assigned to one or more windows according to the PCollection's windowing function, and each individual window contains a finite number of elements. Transforms that aggregate multiple elements, such as GroupByKey and Combine, work implicitly on a per-window basis.
+
+To learn more, see:
+
+* [Windowing](/documentation/programming-guide/#windowing)
+
+## Worker
+
+A container, process, or virtual machine (VM) that handles some part of the parallel processing of a pipeline. The Beam model doesn’t support synchronizing shared state across worker machines. Instead, each worker node has its own independent copy of state. A Beam runner might serialize elements between machines for communication purposes and for other reasons such as persistence.
+
+To learn more, see:
+
+* [Execution model](/documentation/runtime/model/)
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index 0718fc5..94aa1d1 100644
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -342,6 +342,8 @@
   </ul>
 </li>
 
+<li><a href="/documentation/glossary/">Glossary</a></li>
+
 <li class="section-nav-item--collapsible">
   <span class="section-nav-list-title">Runtime systems</span>