Merge pull request #14740: [BEAM-7320] Only run one pipeline in TextIOWriteTest.testWriteViaSink

diff --git a/.test-infra/jenkins/CommonJobProperties.groovy b/.test-infra/jenkins/CommonJobProperties.groovy
index 40f143a..851fc0b 100644
--- a/.test-infra/jenkins/CommonJobProperties.groovy
+++ b/.test-infra/jenkins/CommonJobProperties.groovy
@@ -49,7 +49,7 @@
 
     // Discard old builds. Build records are only kept up to this number of days.
     context.logRotator {
-      daysToKeep(14)
+      daysToKeep(30)
     }
 
     // Source code management.
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
index 369aad9..267b4d5 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/MorePipelineTest.java
@@ -25,14 +25,18 @@
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -99,13 +103,22 @@
 
     @Override
     public PCollectionView<List<T>> expand(PCollection<T> input) {
-      PCollection<KV<Void, T>> materializationInput =
-          input.apply(new View.VoidKeyToMultimapMaterialization<>());
       Coder<T> inputCoder = input.getCoder();
+      PCollection<KV<Long, PCollectionViews.ValueOrMetadata<T, OffsetRange>>> materializationInput =
+          input
+              .apply("IndexElements", ParDo.of(new View.ToListViewDoFn<>()))
+              .setCoder(
+                  KvCoder.of(
+                      BigEndianLongCoder.of(),
+                      PCollectionViews.ValueOrMetadataCoder.create(
+                          inputCoder, OffsetRange.Coder.of())));
       PCollectionView<List<T>> view =
-          PCollectionViews.listViewUsingVoidKey(
+          PCollectionViews.listView(
               materializationInput,
-              (TupleTag<Materializations.MultimapView<Void, T>>) originalView.getTagInternal(),
+              (TupleTag<
+                      Materializations.MultimapView<
+                          Long, PCollectionViews.ValueOrMetadata<T, OffsetRange>>>)
+                  originalView.getTagInternal(),
               (PCollectionViews.TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
               materializationInput.getWindowingStrategy());
       materializationInput.apply(View.CreatePCollectionView.of(view));
diff --git a/runners/flink/1.12/build.gradle b/runners/flink/1.12/build.gradle
index 6db1ccb..93ba0c3 100644
--- a/runners/flink/1.12/build.gradle
+++ b/runners/flink/1.12/build.gradle
@@ -20,7 +20,7 @@
 /* All properties required for loading the Flink build script */
 project.ext {
   // Set the version of all Flink-related dependencies here.
-  flink_version = '1.12.2'
+  flink_version = '1.12.3'
   // Version specific code overrides.
   main_source_overrides = ["${basePath}/1.10/src/main/java", "${basePath}/1.11/src/main/java", './src/main/java']
   test_source_overrides = ["${basePath}/1.10/src/test/java", "${basePath}/1.11/src/test/java", './src/test/java']
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index b814c70..eed8f19 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -45,7 +45,7 @@
   filter org.apache.tools.ant.filters.ReplaceTokens, tokens: [
     'dataflow.legacy_environment_major_version' : '8',
     'dataflow.fnapi_environment_major_version' : '8',
-    'dataflow.container_version' : 'beam-master-20210429',
+    'dataflow.container_version' : 'beam-master-20210503',
     'dataflow.container_base_repository' : 'gcr.io/cloud-dataflow/v1beta3',
   ]
 }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
index 37fa6fc..35d865f 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IsmSideInputReader.java
@@ -69,11 +69,17 @@
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews.HasDefaultValue;
 import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn;
+import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn2;
 import org.apache.beam.sdk.values.PCollectionViews.ListViewFn;
+import org.apache.beam.sdk.values.PCollectionViews.ListViewFn2;
 import org.apache.beam.sdk.values.PCollectionViews.MapViewFn;
+import org.apache.beam.sdk.values.PCollectionViews.MapViewFn2;
 import org.apache.beam.sdk.values.PCollectionViews.MultimapViewFn;
+import org.apache.beam.sdk.values.PCollectionViews.MultimapViewFn2;
 import org.apache.beam.sdk.values.PCollectionViews.SingletonViewFn;
+import org.apache.beam.sdk.values.PCollectionViews.SingletonViewFn2;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
@@ -104,7 +110,13 @@
   private static final Object NULL_PLACE_HOLDER = new Object();
 
   private static final ImmutableList<Class<? extends ViewFn>> KNOWN_SINGLETON_VIEW_TYPES =
-      ImmutableList.of(SingletonViewFn.class, MapViewFn.class, MultimapViewFn.class);
+      ImmutableList.of(
+          SingletonViewFn.class,
+          SingletonViewFn2.class,
+          MapViewFn.class,
+          MapViewFn2.class,
+          MultimapViewFn.class,
+          MultimapViewFn2.class);
 
   /**
    * Limit the number of concurrent initializations.
@@ -302,7 +314,7 @@
       // We handle the singleton case separately since a null value may be returned.
       // We use a null place holder to represent this, and when we detect it, we translate
       // back to null for the user.
-      if (viewFn instanceof SingletonViewFn) {
+      if (viewFn instanceof SingletonViewFn || viewFn instanceof SingletonViewFn2) {
         ViewT rval =
             executionContext
                 .<PCollectionViewWindow<ViewT>, ViewT>getLogicalReferenceCache()
@@ -311,7 +323,7 @@
                     () -> {
                       @SuppressWarnings("unchecked")
                       ViewT viewT =
-                          getSingletonForWindow(tag, (SingletonViewFn<ViewT>) viewFn, window);
+                          getSingletonForWindow(tag, (HasDefaultValue<ViewT>) viewFn, window);
                       @SuppressWarnings("unchecked")
                       ViewT nullPlaceHolder = (ViewT) NULL_PLACE_HOLDER;
                       return viewT == null ? nullPlaceHolder : viewT;
@@ -319,7 +331,10 @@
         return rval == NULL_PLACE_HOLDER ? null : rval;
       } else if (singletonMaterializedTags.contains(tag)) {
         checkArgument(
-            viewFn instanceof MapViewFn || viewFn instanceof MultimapViewFn,
+            viewFn instanceof MapViewFn
+                || viewFn instanceof MapViewFn2
+                || viewFn instanceof MultimapViewFn
+                || viewFn instanceof MultimapViewFn2,
             "Unknown view type stored as singleton. Expected one of %s, got %s",
             KNOWN_SINGLETON_VIEW_TYPES,
             viewFn.getClass().getName());
@@ -336,15 +351,19 @@
             .get(
                 PCollectionViewWindow.of(view, window),
                 () -> {
-                  if (viewFn instanceof IterableViewFn || viewFn instanceof ListViewFn) {
+                  if (viewFn instanceof IterableViewFn
+                      || viewFn instanceof IterableViewFn2
+                      || viewFn instanceof ListViewFn
+                      || viewFn instanceof ListViewFn2) {
                     @SuppressWarnings("unchecked")
                     ViewT viewT = (ViewT) getListForWindow(tag, window);
                     return viewT;
-                  } else if (viewFn instanceof MapViewFn) {
+                  } else if (viewFn instanceof MapViewFn || viewFn instanceof MapViewFn2) {
                     @SuppressWarnings("unchecked")
                     ViewT viewT = (ViewT) getMapForWindow(tag, window);
                     return viewT;
-                  } else if (viewFn instanceof MultimapViewFn) {
+                  } else if (viewFn instanceof MultimapViewFn
+                      || viewFn instanceof MultimapViewFn2) {
                     @SuppressWarnings("unchecked")
                     ViewT viewT = (ViewT) getMultimapForWindow(tag, window);
                     return viewT;
@@ -375,7 +394,7 @@
    * </ul>
    */
   private <T, W extends BoundedWindow> T getSingletonForWindow(
-      TupleTag<?> viewTag, SingletonViewFn<T> viewFn, W window) throws IOException {
+      TupleTag<?> viewTag, HasDefaultValue<T> viewFn, W window) throws IOException {
     @SuppressWarnings({
       "rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
       "unchecked"
diff --git a/runners/portability/java/build.gradle b/runners/portability/java/build.gradle
index bce5404..4ec158d 100644
--- a/runners/portability/java/build.gradle
+++ b/runners/portability/java/build.gradle
@@ -192,6 +192,13 @@
       // https://issues.apache.org/jira/browse/BEAM-10452
       excludeTestsMatching 'org.apache.beam.sdk.transforms.CombineTest$BasicTests.testHotKeyCombiningWithAccumulationMode'
 
+      // https://issues.apache.org/jira/browse/BEAM-12275
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$MultipleInputsAndOutputTests.testSideInputAnnotationWithMultipleSideInputs'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testMapAsEntrySetSideInput'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedMultimapAsEntrySetSideInput'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testWindowedMapAsEntrySetSideInput'
+      excludeTestsMatching 'org.apache.beam.sdk.transforms.ViewTest.testMultimapAsEntrySetSideInput'
+
       // https://issues.apache.org/jira/browse/BEAM-10995
       excludeTestsMatching 'org.apache.beam.sdk.transforms.windowing.WindowingTest.testWindowPreservation'
     }
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
index a8c1771..9bc32fc 100644
--- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/batch/PCollectionViewTranslatorBatch.java
@@ -22,11 +22,14 @@
 import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
 import org.apache.beam.runners.twister2.Twister2BatchTranslationContext;
 import org.apache.beam.runners.twister2.translators.BatchTransformTranslator;
+import org.apache.beam.runners.twister2.translators.functions.ByteToElemFunction;
 import org.apache.beam.runners.twister2.translators.functions.ByteToWindowFunctionPrimitive;
+import org.apache.beam.runners.twister2.translators.functions.ElemToBytesFunction;
 import org.apache.beam.runners.twister2.translators.functions.MapToTupleFunction;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -58,23 +61,50 @@
                 context.getCurrentTransform();
     org.apache.beam.sdk.values.PCollectionView<ViewT> input;
     PCollection<ElemT> inputPCol = context.getInput(transform);
-    final KvCoder coder = (KvCoder) inputPCol.getCoder();
-    Coder inputKeyCoder = coder.getKeyCoder();
+    final Coder coder = inputPCol.getCoder();
     WindowingStrategy windowingStrategy = inputPCol.getWindowingStrategy();
     WindowFn windowFn = windowingStrategy.getWindowFn();
-    final WindowedValue.WindowedValueCoder wvCoder =
-        WindowedValue.FullWindowedValueCoder.of(coder.getValueCoder(), windowFn.windowCoder());
-    BatchTSet<WindowedValue<ElemT>> inputGathered =
-        inputDataSet
-            .direct()
-            .map(new MapToTupleFunction<>(inputKeyCoder, wvCoder))
-            .allGather()
-            .map(new ByteToWindowFunctionPrimitive(inputKeyCoder, wvCoder));
     try {
       input = CreatePCollectionViewTranslation.getView(application);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
-    context.setSideInputDataSet(input.getTagInternal().getId(), inputGathered);
+
+    switch (input.getViewFn().getMaterialization().getUrn()) {
+      case Materializations.MULTIMAP_MATERIALIZATION_URN:
+        KvCoder kvCoder = (KvCoder<?, ?>) coder;
+        final Coder keyCoder = kvCoder.getKeyCoder();
+        final WindowedValue.WindowedValueCoder kvwvCoder =
+            WindowedValue.FullWindowedValueCoder.of(
+                kvCoder.getValueCoder(), windowFn.windowCoder());
+        BatchTSet<WindowedValue<ElemT>> multimapMaterialization =
+            inputDataSet
+                .direct()
+                .map(new MapToTupleFunction<>(keyCoder, kvwvCoder))
+                .allGather()
+                .map(new ByteToWindowFunctionPrimitive(keyCoder, kvwvCoder));
+        context.setSideInputDataSet(input.getTagInternal().getId(), multimapMaterialization);
+        break;
+      case Materializations.ITERABLE_MATERIALIZATION_URN:
+        final WindowedValue.WindowedValueCoder wvCoder =
+            WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder());
+        BatchTSet<WindowedValue<ElemT>> iterableMaterialization =
+            inputDataSet
+                .direct()
+                .map(new ElemToBytesFunction<>(wvCoder))
+                .allGather()
+                .map(new ByteToElemFunction(wvCoder));
+        try {
+          input = CreatePCollectionViewTranslation.getView(application);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        context.setSideInputDataSet(input.getTagInternal().getId(), iterableMaterialization);
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            "Unknown side input materialization "
+                + input.getViewFn().getMaterialization().getUrn());
+    }
   }
 }
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
new file mode 100644
index 0000000..578225f
--- /dev/null
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ByteToElemFunction.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.MapFunc;
+import java.io.ObjectStreamException;
+import java.util.logging.Logger;
+import org.apache.beam.runners.twister2.utils.TranslationUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
+
+/** ByteToWindow function. */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ByteToElemFunction<V> implements MapFunc<WindowedValue<V>, byte[]> {
+  private transient WindowedValueCoder<V> wvCoder;
+  private static final Logger LOG = Logger.getLogger(ByteToElemFunction.class.getName());
+
+  private transient boolean isInitialized = false;
+  private byte[] wvCoderBytes;
+
+  public ByteToElemFunction() {
+    // non arg constructor needed for kryo
+    isInitialized = false;
+  }
+
+  public ByteToElemFunction(final WindowedValueCoder<V> wvCoder) {
+    this.wvCoder = wvCoder;
+
+    wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder);
+  }
+
+  @Override
+  public WindowedValue<V> map(byte[] input) {
+    return TranslationUtils.fromByteArray(input, wvCoder);
+  }
+
+  @Override
+  public void prepare(TSetContext context) {
+    initTransient();
+  }
+
+  /**
+   * Method used to initialize the transient variables that were sent over as byte arrays or proto
+   * buffers.
+   */
+  private void initTransient() {
+    if (isInitialized) {
+      return;
+    }
+
+    wvCoder =
+        (WindowedValueCoder<V>)
+            SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Custom Coder Bytes");
+    this.isInitialized = true;
+  }
+
+  protected Object readResolve() throws ObjectStreamException {
+    return this;
+  }
+}
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
new file mode 100644
index 0000000..c83acdd
--- /dev/null
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/translators/functions/ElemToBytesFunction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.twister2.translators.functions;
+
+import edu.iu.dsc.tws.api.tset.TSetContext;
+import edu.iu.dsc.tws.api.tset.fn.MapFunc;
+import java.io.ObjectStreamException;
+import java.util.logging.Logger;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Map to tuple function. */
+@SuppressWarnings({
+  "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+public class ElemToBytesFunction<V> implements MapFunc<byte[], WindowedValue<V>> {
+
+  private transient WindowedValue.WindowedValueCoder<V> wvCoder;
+  private static final Logger LOG = Logger.getLogger(ElemToBytesFunction.class.getName());
+
+  private transient boolean isInitialized = false;
+  private byte[] wvCoderBytes;
+
+  public ElemToBytesFunction() {
+    // non arg constructor needed for kryo
+    this.isInitialized = false;
+  }
+
+  public ElemToBytesFunction(WindowedValue.WindowedValueCoder<V> wvCoder) {
+    this.wvCoder = wvCoder;
+    wvCoderBytes = SerializableUtils.serializeToByteArray(wvCoder);
+  }
+
+  @Override
+  public @Nullable byte[] map(WindowedValue<V> input) {
+    try {
+      return CoderUtils.encodeToByteArray(wvCoder, input);
+    } catch (CoderException e) {
+      LOG.info(e.getMessage());
+    }
+    return null;
+  }
+
+  @Override
+  public void prepare(TSetContext context) {
+    initTransient();
+  }
+
+  /**
+   * Method used to initialize the transient variables that were sent over as byte arrays or proto
+   * buffers.
+   */
+  private void initTransient() {
+    if (isInitialized) {
+      return;
+    }
+    wvCoder =
+        (WindowedValue.WindowedValueCoder<V>)
+            SerializableUtils.deserializeFromByteArray(wvCoderBytes, "Coder");
+    this.isInitialized = true;
+  }
+
+  protected Object readResolve() throws ObjectStreamException {
+    return this;
+  }
+}
diff --git a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
index bbcd392..6ed77c7 100644
--- a/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
+++ b/runners/twister2/src/main/java/org/apache/beam/runners/twister2/utils/Twister2SideInputReader.java
@@ -23,6 +23,7 @@
 import edu.iu.dsc.tws.api.dataset.DataPartitionConsumer;
 import edu.iu.dsc.tws.api.tset.TSetContext;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,11 +32,11 @@
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.Materializations;
 import org.apache.beam.sdk.transforms.Materializations.MultimapView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -75,40 +76,79 @@
   }
 
   private <T> T getSideInput(PCollectionView<T> view, BoundedWindow window) {
-    Map<BoundedWindow, List<WindowedValue<KV<?, ?>>>> partitionedElements = new HashMap<>();
+    switch (view.getViewFn().getMaterialization().getUrn()) {
+      case Materializations.MULTIMAP_MATERIALIZATION_URN:
+        return getMultimapSideInput(view, window);
+      case Materializations.ITERABLE_MATERIALIZATION_URN:
+        return getIterableSideInput(view, window);
+      default:
+        throw new IllegalArgumentException(
+            "Unknown materialization type: " + view.getViewFn().getMaterialization().getUrn());
+    }
+  }
+
+  private <T> T getMultimapSideInput(PCollectionView<T> view, BoundedWindow window) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = getPartitionedElements(view);
+    Map<BoundedWindow, T> resultMap = new HashMap<>();
+
+    ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
+    for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements :
+        partitionedElements.entrySet()) {
+
+      Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
+      resultMap.put(
+          elements.getKey(),
+          viewFn.apply(
+              InMemoryMultimapSideInputView.fromIterable(
+                  keyCoder,
+                  (Iterable)
+                      elements.getValue().stream()
+                          .map(WindowedValue::getValue)
+                          .collect(Collectors.toList()))));
+    }
+    T result = resultMap.get(window);
+    if (result == null) {
+      result = viewFn.apply(InMemoryMultimapSideInputView.empty());
+    }
+    return result;
+  }
+
+  private Map<BoundedWindow, List<WindowedValue<?>>> getPartitionedElements(
+      PCollectionView<?> view) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = new HashMap<>();
     DataPartition<?> sideInput = runtimeContext.getInput(view.getTagInternal().getId());
     DataPartitionConsumer<?> dataPartitionConsumer = sideInput.getConsumer();
     while (dataPartitionConsumer.hasNext()) {
-      WindowedValue<KV<?, ?>> winValue = (WindowedValue<KV<?, ?>>) dataPartitionConsumer.next();
+      WindowedValue<?> winValue = (WindowedValue<?>) dataPartitionConsumer.next();
       for (BoundedWindow tbw : winValue.getWindows()) {
-        List<WindowedValue<KV<?, ?>>> windowedValues =
+        List<WindowedValue<?>> windowedValues =
             partitionedElements.computeIfAbsent(tbw, k -> new ArrayList<>());
         windowedValues.add(winValue);
       }
     }
+    return partitionedElements;
+  }
 
+  private <T> T getIterableSideInput(PCollectionView<T> view, BoundedWindow window) {
+    Map<BoundedWindow, List<WindowedValue<?>>> partitionedElements = getPartitionedElements(view);
+
+    ViewFn<Materializations.IterableView, T> viewFn =
+        (ViewFn<Materializations.IterableView, T>) view.getViewFn();
     Map<BoundedWindow, T> resultMap = new HashMap<>();
 
-    for (Map.Entry<BoundedWindow, List<WindowedValue<KV<?, ?>>>> elements :
+    for (Map.Entry<BoundedWindow, List<WindowedValue<?>>> elements :
         partitionedElements.entrySet()) {
-
-      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
-      Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder();
       resultMap.put(
           elements.getKey(),
-          (T)
-              viewFn.apply(
-                  InMemoryMultimapSideInputView.fromIterable(
-                      keyCoder,
-                      (Iterable)
-                          elements.getValue().stream()
-                              .map(WindowedValue::getValue)
-                              .collect(Collectors.toList()))));
+          viewFn.apply(
+              () ->
+                  elements.getValue().stream()
+                      .map(WindowedValue::getValue)
+                      .collect(Collectors.toList())));
     }
     T result = resultMap.get(window);
     if (result == null) {
-      ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn();
-      result = viewFn.apply(InMemoryMultimapSideInputView.empty());
+      result = viewFn.apply(() -> Collections.<T>emptyList());
     }
     return result;
   }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 09464db..241fa1f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -63,6 +63,7 @@
 import org.apache.beam.sdk.values.ValueWithRecordId.StripIdsDoFn;
 import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalListener;
@@ -827,24 +828,29 @@
                 source, checkpoint, BoundedWindow.TIMESTAMP_MIN_VALUE));
       }
 
+      private void initializeCurrentReader() throws IOException {
+        Preconditions.checkState(currentReader == null);
+        Object cacheKey =
+            createCacheKey(initialRestriction.getSource(), initialRestriction.getCheckpoint());
+        currentReader = cachedReaders.getIfPresent(cacheKey);
+        if (currentReader == null) {
+          currentReader =
+              initialRestriction
+                  .getSource()
+                  .createReader(pipelineOptions, initialRestriction.getCheckpoint());
+        } else {
+          // If the reader is from cache, then we know that the reader has been started.
+          // We also remove this cache entry to avoid eviction.
+          readerHasBeenStarted = true;
+          cachedReaders.invalidate(cacheKey);
+        }
+      }
+
       @Override
       public boolean tryClaim(UnboundedSourceValue<OutputT>[] position) {
         try {
           if (currentReader == null) {
-            Object cacheKey =
-                createCacheKey(initialRestriction.getSource(), initialRestriction.getCheckpoint());
-            currentReader = cachedReaders.getIfPresent(cacheKey);
-            if (currentReader == null) {
-              currentReader =
-                  initialRestriction
-                      .getSource()
-                      .createReader(pipelineOptions, initialRestriction.getCheckpoint());
-            } else {
-              // If the reader is from cache, then we know that the reader has been started.
-              // We also remove this cache entry to avoid eviction.
-              readerHasBeenStarted = true;
-              cachedReaders.invalidate(cacheKey);
-            }
+            initializeCurrentReader();
           }
           if (currentReader instanceof EmptyUnboundedSource.EmptyUnboundedReader) {
             return false;
@@ -872,6 +878,8 @@
               currentReader.close();
             } catch (IOException closeException) {
               e.addSuppressed(closeException);
+            } finally {
+              currentReader = null;
             }
           }
           throw new RuntimeException(e);
@@ -957,10 +965,7 @@
 
         if (currentReader == null) {
           try {
-            currentReader =
-                initialRestriction
-                    .getSource()
-                    .createReader(pipelineOptions, initialRestriction.getCheckpoint());
+            initializeCurrentReader();
           } catch (IOException e) {
             throw new RuntimeException(e);
           }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 63b4191..a3b5665 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
@@ -47,7 +46,6 @@
 import org.apache.beam.sdk.transforms.CombineWithContext.Context;
 import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.transforms.View.VoidKeyToMultimapMaterialization;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -1308,43 +1306,21 @@
 
     @Override
     public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
-      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
-              || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
-        PCollection<OutputT> combined =
-            input.apply(
-                "CombineValues",
-                Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
-        Coder<OutputT> outputCoder = combined.getCoder();
-        PCollectionView<OutputT> view =
-            PCollectionViews.singletonView(
-                combined,
-                (TypeDescriptorSupplier<OutputT>)
-                    () -> outputCoder != null ? outputCoder.getEncodedTypeDescriptor() : null,
-                input.getWindowingStrategy(),
-                insertDefault,
-                insertDefault ? fn.defaultValue() : null,
-                combined.getCoder());
-        combined.apply("CreatePCollectionView", CreatePCollectionView.of(view));
-        return view;
-      }
-
       PCollection<OutputT> combined =
-          input.apply(Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
-      PCollection<KV<Void, OutputT>> materializationInput =
-          combined.apply(new VoidKeyToMultimapMaterialization<>());
+          input.apply(
+              "CombineValues",
+              Combine.<InputT, OutputT>globally(fn).withoutDefaults().withFanout(fanout));
       Coder<OutputT> outputCoder = combined.getCoder();
       PCollectionView<OutputT> view =
-          PCollectionViews.singletonViewUsingVoidKey(
-              materializationInput,
+          PCollectionViews.singletonView(
+              combined,
               (TypeDescriptorSupplier<OutputT>)
                   () -> outputCoder != null ? outputCoder.getEncodedTypeDescriptor() : null,
               input.getWindowingStrategy(),
               insertDefault,
               insertDefault ? fn.defaultValue() : null,
               combined.getCoder());
-      materializationInput.apply(CreatePCollectionView.of(view));
+      combined.apply("CreatePCollectionView", CreatePCollectionView.of(view));
       return view;
     }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 904575c..e81f0b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.transforms;
 
-import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
-
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +27,6 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -260,33 +257,16 @@
        * Long#MIN_VALUE} key is used to store all known {@link OffsetRange ranges} allowing us to
        * compute such an ordering.
        */
-
-      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
-      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
-              || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
-        Coder<T> inputCoder = input.getCoder();
-        PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> materializationInput =
-            input
-                .apply("IndexElements", ParDo.of(new ToListViewDoFn<>()))
-                .setCoder(
-                    KvCoder.of(
-                        BigEndianLongCoder.of(),
-                        ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
-        PCollectionView<List<T>> view =
-            PCollectionViews.listView(
-                materializationInput,
-                (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
-                input.getWindowingStrategy());
-        materializationInput.apply(CreatePCollectionView.of(view));
-        return view;
-      }
-
-      PCollection<KV<Void, T>> materializationInput =
-          input.apply(new VoidKeyToMultimapMaterialization<>());
       Coder<T> inputCoder = input.getCoder();
+      PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> materializationInput =
+          input
+              .apply("IndexElements", ParDo.of(new ToListViewDoFn<>()))
+              .setCoder(
+                  KvCoder.of(
+                      BigEndianLongCoder.of(),
+                      ValueOrMetadataCoder.create(inputCoder, OffsetRange.Coder.of())));
       PCollectionView<List<T>> view =
-          PCollectionViews.listViewUsingVoidKey(
+          PCollectionViews.listView(
               materializationInput,
               (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
               materializationInput.getWindowingStrategy());
@@ -300,8 +280,8 @@
    * range for each window seen. We use random offset ranges to minimize the chance that two ranges
    * overlap increasing the odds that each "key" represents a single index.
    */
-  private static class ToListViewDoFn<T>
-      extends DoFn<T, KV<Long, ValueOrMetadata<T, OffsetRange>>> {
+  @Internal
+  public static class ToListViewDoFn<T> extends DoFn<T, KV<Long, ValueOrMetadata<T, OffsetRange>>> {
     private Map<BoundedWindow, OffsetRange> windowsToOffsets = new HashMap<>();
 
     private OffsetRange generateRange(BoundedWindow window) {
@@ -350,29 +330,19 @@
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
-      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
-              || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
-        Coder<T> inputCoder = input.getCoder();
-        PCollectionView<Iterable<T>> view =
-            PCollectionViews.iterableView(
-                input,
-                (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
-                input.getWindowingStrategy());
-        input.apply(CreatePCollectionView.of(view));
-        return view;
-      }
-
-      PCollection<KV<Void, T>> materializationInput =
-          input.apply(new VoidKeyToMultimapMaterialization<>());
       Coder<T> inputCoder = input.getCoder();
+      // HACK to work around https://issues.apache.org/jira/browse/BEAM-12228:
+      // There are bugs in "composite" vs "primitive" transform distinction
+      // in TransformHierachy. This noop transform works around them and should be zero
+      // cost.
+      PCollection<T> materializationInput =
+          input.apply(MapElements.via(new SimpleFunction<T, T>(x -> x) {}));
       PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableViewUsingVoidKey(
+          PCollectionViews.iterableView(
               materializationInput,
               (TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
               materializationInput.getWindowingStrategy());
-      materializationInput.apply(CreatePCollectionView.of(view));
+      input.apply(CreatePCollectionView.of(view));
       return view;
     }
   }
@@ -508,35 +478,22 @@
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
-      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
-              || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
-        KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
-        Coder<K> keyCoder = kvCoder.getKeyCoder();
-        Coder<V> valueCoder = kvCoder.getValueCoder();
-        PCollectionView<Map<K, Iterable<V>>> view =
-            PCollectionViews.multimapView(
-                input,
-                (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
-                (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
-                input.getWindowingStrategy());
-        input.apply(CreatePCollectionView.of(view));
-        return view;
-      }
-
       KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
       Coder<K> keyCoder = kvCoder.getKeyCoder();
       Coder<V> valueCoder = kvCoder.getValueCoder();
-      PCollection<KV<Void, KV<K, V>>> materializationInput =
-          input.apply(new VoidKeyToMultimapMaterialization<>());
+      // HACK to work around https://issues.apache.org/jira/browse/BEAM-12228:
+      // There are bugs in "composite" vs "primitive" transform distinction
+      // in TransformHierachy. This noop transform works around them and should be zero
+      // cost.
+      PCollection<KV<K, V>> materializationInput =
+          input.apply(MapElements.via(new SimpleFunction<KV<K, V>, KV<K, V>>(x -> x) {}));
       PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapViewUsingVoidKey(
+          PCollectionViews.multimapView(
               materializationInput,
               (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
               (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
               materializationInput.getWindowingStrategy());
-      materializationInput.apply(CreatePCollectionView.of(view));
+      input.apply(CreatePCollectionView.of(view));
       return view;
     }
   }
@@ -567,37 +524,19 @@
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      // TODO(BEAM-10097): Make this the default expansion for all portable runners.
-      if (hasExperiment(input.getPipeline().getOptions(), "beam_fn_api")
-          && (hasExperiment(input.getPipeline().getOptions(), "use_runner_v2")
-              || hasExperiment(input.getPipeline().getOptions(), "use_unified_worker"))) {
-        KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
-        Coder<K> keyCoder = kvCoder.getKeyCoder();
-        Coder<V> valueCoder = kvCoder.getValueCoder();
-
-        PCollectionView<Map<K, V>> view =
-            PCollectionViews.mapView(
-                input,
-                (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
-                (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
-                input.getWindowingStrategy());
-        input.apply(CreatePCollectionView.of(view));
-        return view;
-      }
-
       KvCoder<K, V> kvCoder = (KvCoder<K, V>) input.getCoder();
       Coder<K> keyCoder = kvCoder.getKeyCoder();
       Coder<V> valueCoder = kvCoder.getValueCoder();
 
-      PCollection<KV<Void, KV<K, V>>> materializationInput =
-          input.apply(new VoidKeyToMultimapMaterialization<>());
+      PCollection<KV<K, V>> materializationInput =
+          input.apply(MapElements.via(new SimpleFunction<KV<K, V>, KV<K, V>>(x -> x) {}));
       PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapViewUsingVoidKey(
+          PCollectionViews.mapView(
               materializationInput,
               (TypeDescriptorSupplier<K>) keyCoder::getEncodedTypeDescriptor,
               (TypeDescriptorSupplier<V>) valueCoder::getEncodedTypeDescriptor,
               materializationInput.getWindowingStrategy());
-      materializationInput.apply(CreatePCollectionView.of(view));
+      input.apply(CreatePCollectionView.of(view));
       return view;
     }
   }
@@ -606,34 +545,11 @@
   // Internal details below
 
   /**
-   * A {@link PTransform} which converts all values into {@link KV}s with {@link Void} keys.
-   *
-   * <p>TODO(BEAM-10097): Replace this materialization with specializations that optimize the
-   * various SDK requested views.
-   */
-  @Internal
-  public static class VoidKeyToMultimapMaterialization<T>
-      extends PTransform<PCollection<T>, PCollection<KV<Void, T>>> {
-
-    private static class VoidKeyToMultimapMaterializationDoFn<T> extends DoFn<T, KV<Void, T>> {
-      @ProcessElement
-      public void processElement(@Element T element, OutputReceiver<KV<Void, T>> r) {
-        r.output(KV.of((Void) null, element));
-      }
-    }
-
-    @Override
-    public PCollection<KV<Void, T>> expand(PCollection<T> input) {
-      PCollection output = input.apply(ParDo.of(new VoidKeyToMultimapMaterializationDoFn<>()));
-      output.setCoder(KvCoder.of(VoidCoder.of(), input.getCoder()));
-      return output;
-    }
-  }
-
-  /**
    * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
    *
-   * <p>Creates a primitive {@link PCollectionView}.
+   * <p>Placeholder transform for runners to have a hook to materialize a {@link PCollection} as a
+   * side input. The metadata included in the {@link PCollectionView} is how the {@link PCollection}
+   * will be read as a side input.
    *
    * @param <ElemT> The type of the elements of the input PCollection
    * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index df88e21..360c1af 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -120,6 +120,7 @@
    */
   @Deprecated
   public static <T, W extends BoundedWindow> PCollectionView<T> singletonViewUsingVoidKey(
+      TupleTag<MultimapView<Void, T>> tag,
       PCollection<KV<Void, T>> pCollection,
       TypeDescriptorSupplier<T> typeDescriptorSupplier,
       WindowingStrategy<?, W> windowingStrategy,
@@ -128,6 +129,7 @@
       Coder<T> defaultValueCoder) {
     return new SimplePCollectionView<>(
         pCollection,
+        tag,
         new SingletonViewFn<>(hasDefault, defaultValue, defaultValueCoder, typeDescriptorSupplier),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
         windowingStrategy);
@@ -156,11 +158,13 @@
    */
   @Deprecated
   public static <T, W extends BoundedWindow> PCollectionView<Iterable<T>> iterableViewUsingVoidKey(
+      TupleTag<MultimapView<Void, T>> tag,
       PCollection<KV<Void, T>> pCollection,
       TypeDescriptorSupplier<T> typeDescriptorSupplier,
       WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
+        tag,
         new IterableViewFn<>(typeDescriptorSupplier),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
         windowingStrategy);
@@ -184,16 +188,35 @@
   /**
    * Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
    * provided {@link WindowingStrategy}.
+   */
+  public static <T, W extends BoundedWindow> PCollectionView<List<T>> listView(
+      PCollection<KV<Long, ValueOrMetadata<T, OffsetRange>>> pCollection,
+      TupleTag<Materializations.MultimapView<Long, ValueOrMetadata<T, OffsetRange>>> tag,
+      TypeDescriptorSupplier<T> typeDescriptorSupplier,
+      WindowingStrategy<?, W> windowingStrategy) {
+    return new SimplePCollectionView<>(
+        pCollection,
+        tag,
+        new ListViewFn2<>(typeDescriptorSupplier),
+        windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
+        windowingStrategy);
+  }
+
+  /**
+   * Returns a {@code PCollectionView<List<T>>} capable of processing elements windowed using the
+   * provided {@link WindowingStrategy}.
    *
    * @deprecated See {@link #listView}.
    */
   @Deprecated
   public static <T, W extends BoundedWindow> PCollectionView<List<T>> listViewUsingVoidKey(
+      TupleTag<MultimapView<Void, T>> tag,
       PCollection<KV<Void, T>> pCollection,
       TypeDescriptorSupplier<T> typeDescriptorSupplier,
       WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
+        tag,
         new ListViewFn<>(typeDescriptorSupplier),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
         windowingStrategy);
@@ -243,12 +266,14 @@
    */
   @Deprecated
   public static <K, V, W extends BoundedWindow> PCollectionView<Map<K, V>> mapViewUsingVoidKey(
+      TupleTag<MultimapView<Void, KV<K, V>>> tag,
       PCollection<KV<Void, KV<K, V>>> pCollection,
       TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
       TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
       WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
+        tag,
         new MapViewFn<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
         windowingStrategy);
@@ -279,12 +304,14 @@
   @Deprecated
   public static <K, V, W extends BoundedWindow>
       PCollectionView<Map<K, Iterable<V>>> multimapViewUsingVoidKey(
+          TupleTag<MultimapView<Void, KV<K, V>>> tag,
           PCollection<KV<Void, KV<K, V>>> pCollection,
           TypeDescriptorSupplier<K> keyTypeDescriptorSupplier,
           TypeDescriptorSupplier<V> valueTypeDescriptorSupplier,
           WindowingStrategy<?, W> windowingStrategy) {
     return new SimplePCollectionView<>(
         pCollection,
+        tag,
         new MultimapViewFn<>(keyTypeDescriptorSupplier, valueTypeDescriptorSupplier),
         windowingStrategy.getWindowFn().getDefaultWindowMappingFn(),
         windowingStrategy);
@@ -312,7 +339,9 @@
    * <p>{@link SingletonViewFn} is meant to be removed in the future and replaced with this class.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  private static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T> {
+  @Internal
+  public static class SingletonViewFn2<T> extends ViewFn<IterableView<T>, T>
+      implements HasDefaultValue<T> {
     private byte @Nullable [] encodedDefaultValue;
     private transient @Nullable T defaultValue;
     private @Nullable Coder<T> valueCoder;
@@ -350,6 +379,7 @@
      *
      * @throws NoSuchElementException if no default was specified.
      */
+    @Override
     public T getDefaultValue() {
       if (!hasDefault) {
         throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
@@ -393,6 +423,11 @@
     }
   }
 
+  @Internal
+  public interface HasDefaultValue<T> {
+    T getDefaultValue();
+  }
+
   /**
    * Implementation which is able to adapt a multimap materialization to a {@code T}.
    *
@@ -402,7 +437,8 @@
    */
   @Deprecated
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  public static class SingletonViewFn<T> extends ViewFn<MultimapView<Void, T>, T> {
+  public static class SingletonViewFn<T> extends ViewFn<MultimapView<Void, T>, T>
+      implements HasDefaultValue<T> {
     private byte @Nullable [] encodedDefaultValue;
     private transient @Nullable T defaultValue;
     private @Nullable Coder<T> valueCoder;
@@ -440,6 +476,7 @@
      *
      * @throws NoSuchElementException if no default was specified.
      */
+    @Override
     public T getDefaultValue() {
       if (!hasDefault) {
         throw new NoSuchElementException("Empty PCollection accessed as a singleton view.");
@@ -493,7 +530,8 @@
    * <p>{@link IterableViewFn} is meant to be removed in the future and replaced with this class.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  private static class IterableViewFn2<T> extends ViewFn<IterableView<T>, Iterable<T>> {
+  @Internal
+  public static class IterableViewFn2<T> extends ViewFn<IterableView<T>, Iterable<T>> {
     private TypeDescriptorSupplier<T> typeDescriptorSupplier;
 
     public IterableViewFn2(TypeDescriptorSupplier<T> typeDescriptorSupplier) {
@@ -559,7 +597,7 @@
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
   @VisibleForTesting
-  static class ListViewFn2<T>
+  public static class ListViewFn2<T>
       extends ViewFn<MultimapView<Long, ValueOrMetadata<T, OffsetRange>>, List<T>> {
     private TypeDescriptorSupplier<T> typeDescriptorSupplier;
 
@@ -1003,7 +1041,8 @@
    * <p>{@link MultimapViewFn} is meant to be removed in the future and replaced with this class.
    */
   @Experimental(Kind.CORE_RUNNERS_ONLY)
-  private static class MultimapViewFn2<K, V>
+  @Internal
+  public static class MultimapViewFn2<K, V>
       extends ViewFn<MultimapView<K, V>, Map<K, Iterable<V>>> {
     private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
     private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
@@ -1091,7 +1130,8 @@
    *
    * <p>{@link MapViewFn} is meant to be removed in the future and replaced with this class.
    */
-  private static class MapViewFn2<K, V> extends ViewFn<MultimapView<K, V>, Map<K, V>> {
+  @Internal
+  public static class MapViewFn2<K, V> extends ViewFn<MultimapView<K, V>, Map<K, V>> {
     private TypeDescriptorSupplier<K> keyTypeDescriptorSupplier;
     private TypeDescriptorSupplier<V> valueTypeDescriptorSupplier;
 
@@ -1279,7 +1319,13 @@
 
     @Override
     public String toString() {
-      return MoreObjects.toStringHelper(this).add("tag", tag).toString();
+      return MoreObjects.toStringHelper(this)
+          .add("tag", tag)
+          .add("viewFn", viewFn)
+          .add("coder", coder)
+          .add("windowMappingFn", windowMappingFn)
+          .add("pCollection", pCollection)
+          .toString();
     }
 
     @Override
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
index b340d91..61b4bf8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.testing;
 
-import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -42,82 +40,42 @@
     // materializations will differ but test code should not worry about what these look like if
     // they are relying on the ViewFn to "undo" the conversion.
 
-    // TODO(BEAM-10097): Make this the default case once all portable runners can support
-    // the iterable access pattern.
-    if (hasExperiment(options, "beam_fn_api")
-        && (hasExperiment(options, "use_runner_v2")
-            || hasExperiment(options, "use_unified_worker"))) {
-      if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
-        for (Object value : values) {
-          rval.add(value);
+    if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(value);
+      }
+    } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(value);
+      }
+    } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
+      if (values.length > 0) {
+        rval.add(
+            KV.of(
+                Long.MIN_VALUE, ValueOrMetadata.createMetadata(new OffsetRange(0, values.length))));
+        for (int i = 0; i < values.length; ++i) {
+          rval.add(KV.of((long) i, ValueOrMetadata.create(values[i])));
         }
-      } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
-        for (Object value : values) {
-          rval.add(value);
-        }
-      } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
-        if (values.length > 0) {
-          rval.add(
-              KV.of(
-                  Long.MIN_VALUE,
-                  ValueOrMetadata.createMetadata(new OffsetRange(0, values.length))));
-          for (int i = 0; i < values.length; ++i) {
-            rval.add(KV.of((long) i, ValueOrMetadata.create(values[i])));
-          }
-        }
-      } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
-        for (Object value : values) {
-          rval.add(value);
-        }
-      } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
-        for (Object value : values) {
-          rval.add(value);
-        }
-      } else {
-        throw new IllegalArgumentException(
-            String.format(
-                "Unknown type of view %s. Supported views are %s.",
-                viewTransformClass.getClass(),
-                ImmutableSet.of(
-                    View.AsSingleton.class,
-                    View.AsIterable.class,
-                    View.AsList.class,
-                    View.AsMap.class,
-                    View.AsMultimap.class)));
+      }
+    } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(value);
+      }
+    } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
+      for (Object value : values) {
+        rval.add(value);
       }
     } else {
-      if (View.AsSingleton.class.equals(viewTransformClass.getClass())) {
-        for (Object value : values) {
-          rval.add(KV.of(null, value));
-        }
-      } else if (View.AsIterable.class.equals(viewTransformClass.getClass())) {
-        for (Object value : values) {
-          rval.add(KV.of(null, value));
-        }
-      } else if (View.AsList.class.equals(viewTransformClass.getClass())) {
-        for (Object value : values) {
-          rval.add(KV.of(null, value));
-        }
-      } else if (View.AsMap.class.equals(viewTransformClass.getClass())) {
-        for (Object value : values) {
-          rval.add(KV.of(null, value));
-        }
-      } else if (View.AsMultimap.class.equals(viewTransformClass.getClass())) {
-        for (Object value : values) {
-          rval.add(KV.of(null, value));
-        }
-      } else {
-        throw new IllegalArgumentException(
-            String.format(
-                "Unknown type of view %s. Supported views are %s.",
-                viewTransformClass.getClass(),
-                ImmutableSet.of(
-                    View.AsSingleton.class,
-                    View.AsIterable.class,
-                    View.AsList.class,
-                    View.AsMap.class,
-                    View.AsMultimap.class)));
-      }
+      throw new IllegalArgumentException(
+          String.format(
+              "Unknown type of view %s. Supported views are %s.",
+              viewTransformClass.getClass(),
+              ImmutableSet.of(
+                  View.AsSingleton.class,
+                  View.AsIterable.class,
+                  View.AsList.class,
+                  View.AsMap.class,
+                  View.AsMultimap.class)));
     }
     return Collections.unmodifiableList(rval);
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
index f0fbab6..daa4877 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
@@ -414,7 +414,7 @@
     DateTime startTime = new DateTime();
     int sizeOfSubscriptionList = 0;
     while (sizeOfSubscriptionList == 0
-        && Seconds.secondsBetween(new DateTime(), startTime).getSeconds()
+        && Seconds.secondsBetween(startTime, new DateTime()).getSeconds()
             < timeoutDuration.toStandardSeconds().getSeconds()) {
       // Sleep 1 sec
       Thread.sleep(1000);
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 29d8bac..499a1f8 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -2197,10 +2197,8 @@
      * transform ties checkpointing semantics in compatible Beam runners and transactions in Kafka
      * (version 0.11+) to ensure a record is written only once. As the implementation relies on
      * runners checkpoint semantics, not all the runners are compatible. The sink throws an
-     * exception during initialization if the runner is not explicitly allowed. Flink runner is one
-     * of the runners whose checkpoint semantics are not compatible with current implementation
-     * (hope to provide a solution in near future). Dataflow runner and Spark runners are
-     * compatible.
+     * exception during initialization if the runner is not explicitly allowed. The Dataflow, Flink,
+     * and Spark runners are compatible.
      *
      * <p>Note on performance: Exactly-once sink involves two shuffles of the records. In addition
      * to cost of shuffling the records among workers, the records go through 2
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index f615290..498f877 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -329,6 +329,7 @@
 DATACLASS_TYPE = 101
 NAMED_TUPLE_TYPE = 102
 ENUM_TYPE = 103
+NESTED_STATE_TYPE = 104
 
 # Types that can be encoded as iterables, but are not literally
 # lists, etc. due to being lazy.  The actual type is not preserved
@@ -442,24 +443,50 @@
             "for the input of '%s'" %
             (value, type(value), self.requires_deterministic_step_label))
       self.encode_type(type(value), stream)
-      self.iterable_coder_impl.encode_to_stream(
-          [getattr(value, field.name) for field in dataclasses.fields(value)],
-          stream,
-          True)
+      values = [
+          getattr(value, field.name) for field in dataclasses.fields(value)
+      ]
+      try:
+        self.iterable_coder_impl.encode_to_stream(values, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
     elif isinstance(value, tuple) and hasattr(type(value), '_fields'):
       stream.write_byte(NAMED_TUPLE_TYPE)
       self.encode_type(type(value), stream)
-      self.iterable_coder_impl.encode_to_stream(value, stream, True)
+      try:
+        self.iterable_coder_impl.encode_to_stream(value, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
     elif isinstance(value, enum.Enum):
       stream.write_byte(ENUM_TYPE)
       self.encode_type(type(value), stream)
       # Enum values can be of any type.
-      self.encode_to_stream(value.value, stream, True)
+      try:
+        self.encode_to_stream(value.value, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
+    elif hasattr(value, "__getstate__"):
+      if not hasattr(value, "__setstate__"):
+        raise TypeError(
+            "Unable to deterministically encode '%s' of type '%s', "
+            "for the input of '%s'. The object defines __getstate__ but not "
+            "__setstate__." %
+            (value, type(value), self.requires_deterministic_step_label))
+      stream.write_byte(NESTED_STATE_TYPE)
+      self.encode_type(type(value), stream)
+      state_value = value.__getstate__()
+      try:
+        self.encode_to_stream(state_value, stream, True)
+      except Exception as e:
+        raise TypeError(self._deterministic_encoding_error_msg(value)) from e
     else:
-      raise TypeError(
-          "Unable to deterministically encode '%s' of type '%s', "
-          "please provide a type hint for the input of '%s'" %
-          (value, type(value), self.requires_deterministic_step_label))
+      raise TypeError(self._deterministic_encoding_error_msg(value))
+
+  def _deterministic_encoding_error_msg(self, value):
+    return (
+        "Unable to deterministically encode '%s' of type '%s', "
+        "please provide a type hint for the input of '%s'" %
+        (value, type(value), self.requires_deterministic_step_label))
 
   def encode_type(self, t, stream):
     stream.write(dill.dumps(t), True)
@@ -510,6 +537,12 @@
     elif t == ENUM_TYPE:
       cls = self.decode_type(stream)
       return cls(self.decode_from_stream(stream, True))
+    elif t == NESTED_STATE_TYPE:
+      cls = self.decode_type(stream)
+      state = self.decode_from_stream(stream, True)
+      value = cls.__new__(cls)
+      value.__setstate__(state)
+      return value
     elif t == UNKNOWN_TYPE:
       return self.fallback_coder_impl.decode_from_stream(stream, nested)
     else:
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 17825a9..0a18e27 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -69,6 +69,22 @@
 MyFlag = enum.Flag('MyFlag', 'F1 F2 F3')  # pylint: disable=too-many-function-args
 
 
+class DefinesGetState:
+  def __init__(self, value):
+    self.value = value
+
+  def __getstate__(self):
+    return self.value
+
+  def __eq__(self, other):
+    return type(other) is type(self) and other.value == self.value
+
+
+class DefinesGetAndSetState(DefinesGetState):
+  def __setstate__(self, value):
+    self.value = value
+
+
 # Defined out of line for picklability.
 class CustomCoder(coders.Coder):
   def encode(self, x):
@@ -236,6 +252,15 @@
     self.check_coder(deterministic_coder, list(MyIntFlag))
     self.check_coder(deterministic_coder, list(MyFlag))
 
+    self.check_coder(
+        deterministic_coder,
+        [DefinesGetAndSetState(1), DefinesGetAndSetState((1, 2, 3))])
+
+    with self.assertRaises(TypeError):
+      self.check_coder(deterministic_coder, DefinesGetState(1))
+    with self.assertRaises(TypeError):
+      self.check_coder(deterministic_coder, DefinesGetAndSetState(dict()))
+
   def test_dill_coder(self):
     cell_value = (lambda x: lambda: x)(0).__closure__[0]
     self.check_coder(coders.DillCoder(), 'a', 1, cell_value)
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai.py
deleted file mode 100644
index 6e50cd2..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai.py
+++ /dev/null
@@ -1,585 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A connector for sending API requests to the GCP Recommendations AI
-API (https://cloud.google.com/recommendations).
-"""
-
-from __future__ import absolute_import
-
-from typing import Sequence
-from typing import Tuple
-
-from google.api_core.retry import Retry
-
-from apache_beam import pvalue
-from apache_beam.metrics import Metrics
-from apache_beam.options.pipeline_options import GoogleCloudOptions
-from apache_beam.transforms import DoFn
-from apache_beam.transforms import ParDo
-from apache_beam.transforms import PTransform
-from apache_beam.transforms.util import GroupIntoBatches
-from cachetools.func import ttl_cache
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-except ImportError:
-  raise ImportError(
-      'Google Cloud Recommendation AI not supported for this execution '
-      'environment (could not import google.cloud.recommendationengine).')
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-__all__ = [
-    'CreateCatalogItem',
-    'WriteUserEvent',
-    'ImportCatalogItems',
-    'ImportUserEvents',
-    'PredictUserEvent'
-]
-
-FAILED_CATALOG_ITEMS = "failed_catalog_items"
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_prediction_client():
-  """Returns a Recommendation AI - Prediction Service client."""
-  _client = recommendationengine.PredictionServiceClient()
-  return _client
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_catalog_client():
-  """Returns a Recommendation AI - Catalog Service client."""
-  _client = recommendationengine.CatalogServiceClient()
-  return _client
-
-
-@ttl_cache(maxsize=128, ttl=3600)
-def get_recommendation_user_event_client():
-  """Returns a Recommendation AI - UserEvent Service client."""
-  _client = recommendationengine.UserEventServiceClient()
-  return _client
-
-
-class CreateCatalogItem(PTransform):
-  """Creates catalogitem information.
-    The ``PTranform`` returns a PCollectionTuple with a PCollections of
-    successfully and failed created CatalogItems.
-
-    Example usage::
-
-      pipeline | CreateCatalogItem(
-        project='example-gcp-project',
-        catalog_name='my-catalog')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog"):
-    """Initializes a :class:`CreateCatalogItem` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          """GCP project name needs to be specified in "project" pipeline
-            option""")
-    return pcoll | ParDo(
-        _CreateCatalogItemFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name))
-
-
-class _CreateCatalogItemFn(DoFn):
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/{catalog_name}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_catalog_client()
-
-  def process(self, element):
-    catalog_item = recommendationengine.CatalogItem(element)
-    request = recommendationengine.CreateCatalogItemRequest(
-        parent=self.parent, catalog_item=catalog_item)
-
-    try:
-      created_catalog_item = self._client.create_catalog_item(
-          request=request,
-          retry=self.retry,
-          timeout=self.timeout,
-          metadata=self.metadata)
-
-      self.counter.inc()
-      yield recommendationengine.CatalogItem.to_dict(created_catalog_item)
-    except Exception:
-      yield pvalue.TaggedOutput(
-          FAILED_CATALOG_ITEMS,
-          recommendationengine.CatalogItem.to_dict(catalog_item))
-
-
-class ImportCatalogItems(PTransform):
-  """Imports catalogitems in bulk.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed imported CatalogItems.
-
-    Example usage::
-
-      pipeline
-      | ImportCatalogItems(
-          project='example-gcp-project',
-          catalog_name='my-catalog')
-    """
-  def __init__(
-      self,
-      max_batch_size: int = 5000,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog"):
-    """Initializes a :class:`ImportCatalogItems` transform
-
-        Args:
-            batch_size (int): Required. Maximum number of catalogitems per
-              request.
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-        """
-    self.max_batch_size = max_batch_size
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return (
-        pcoll | GroupIntoBatches.WithShardedKey(self.max_batch_size) | ParDo(
-            _ImportCatalogItemsFn(
-                self.project,
-                self.retry,
-                self.timeout,
-                self.metadata,
-                self.catalog_name)))
-
-
-class _ImportCatalogItemsFn(DoFn):
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/{catalog_name}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self.client = get_recommendation_catalog_client()
-
-  def process(self, element):
-    catalog_items = [recommendationengine.CatalogItem(e) for e in element[1]]
-    catalog_inline_source = recommendationengine.CatalogInlineSource(
-        {"catalog_items": catalog_items})
-    input_config = recommendationengine.InputConfig(
-        catalog_inline_source=catalog_inline_source)
-
-    request = recommendationengine.ImportCatalogItemsRequest(
-        parent=self.parent, input_config=input_config)
-
-    try:
-      operation = self._client.import_catalog_items(
-          request=request,
-          retry=self.retry,
-          timeout=self.timeout,
-          metadata=self.metadata)
-      self.counter.inc(len(catalog_items))
-      yield operation.result()
-    except Exception:
-      yield pvalue.TaggedOutput(FAILED_CATALOG_ITEMS, catalog_items)
-
-
-class WriteUserEvent(PTransform):
-  """Write user event information.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed written UserEvents.
-
-    Example usage::
-
-      pipeline
-      | WriteUserEvent(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store"):
-    """Initializes a :class:`WriteUserEvent` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return pcoll | ParDo(
-        _WriteUserEventFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name,
-            self.event_store))
-
-
-class _WriteUserEventFn(DoFn):
-  FAILED_USER_EVENTS = "failed_user_events"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/"\
-                  f"{catalog_name}/eventStores/{event_store}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_user_event_client()
-
-  def process(self, element):
-    user_event = recommendationengine.UserEvent(element)
-    request = recommendationengine.WriteUserEventRequest(
-        parent=self.parent, user_event=user_event)
-
-    try:
-      created_user_event = self._client.write_user_event(request)
-      self.counter.inc()
-      yield recommendationengine.UserEvent.to_dict(created_user_event)
-    except Exception:
-      yield pvalue.TaggedOutput(
-          self.FAILED_USER_EVENTS,
-          recommendationengine.UserEvent.to_dict(user_event))
-
-
-class ImportUserEvents(PTransform):
-  """Imports userevents in bulk.
-    The `PTransform` returns a PCollectionTuple with PCollections of
-    successfully and failed imported UserEvents.
-
-    Example usage::
-
-      pipeline
-      | ImportUserEvents(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store')
-    """
-  def __init__(
-      self,
-      max_batch_size: int = 5000,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store"):
-    """Initializes a :class:`WriteUserEvent` transform.
-
-        Args:
-            batch_size (int): Required. Maximum number of catalogitems
-              per request.
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-        """
-    self.max_batch_size = max_batch_size
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return (
-        pcoll | GroupIntoBatches.WithShardedKey(self.max_batch_size) | ParDo(
-            _ImportUserEventsFn(
-                self.project,
-                self.retry,
-                self.timeout,
-                self.metadata,
-                self.catalog_name,
-                self.event_store)))
-
-
-class _ImportUserEventsFn(DoFn):
-  FAILED_USER_EVENTS = "failed_user_events"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.parent = f"projects/{project}/locations/global/catalogs/"\
-                  f"{catalog_name}/eventStores/{event_store}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self.client = get_recommendation_user_event_client()
-
-  def process(self, element):
-
-    user_events = [recommendationengine.UserEvent(e) for e in element[1]]
-    user_event_inline_source = recommendationengine.UserEventInlineSource(
-        {"user_events": user_events})
-    input_config = recommendationengine.InputConfig(
-        user_event_inline_source=user_event_inline_source)
-
-    request = recommendationengine.ImportUserEventsRequest(
-        parent=self.parent, input_config=input_config)
-
-    try:
-      operation = self._client.write_user_event(request)
-      self.counter.inc(len(user_events))
-      yield recommendationengine.PredictResponse.to_dict(operation.result())
-    except Exception:
-      yield pvalue.TaggedOutput(self.FAILED_USER_EVENTS, user_events)
-
-
-class PredictUserEvent(PTransform):
-  """Make a recommendation prediction.
-    The `PTransform` returns a PCollection
-
-    Example usage::
-
-      pipeline
-      | PredictUserEvent(
-          project='example-gcp-project',
-          catalog_name='my-catalog',
-          event_store='my_event_store',
-          placement_id='recently_viewed_default')
-    """
-  def __init__(
-      self,
-      project: str = None,
-      retry: Retry = None,
-      timeout: float = 120,
-      metadata: Sequence[Tuple[str, str]] = None,
-      catalog_name: str = "default_catalog",
-      event_store: str = "default_event_store",
-      placement_id: str = None):
-    """Initializes a :class:`PredictUserEvent` transform.
-
-        Args:
-            project (str): Optional. GCP project name in which the catalog
-              data will be imported.
-            retry: Optional. Designation of what
-              errors, if any, should be retried.
-            timeout (float): Optional. The amount of time, in seconds, to wait
-              for the request to complete.
-            metadata: Optional. Strings which
-              should be sent along with the request as metadata.
-            catalog_name (str): Optional. Name of the catalog.
-              Default: 'default_catalog'
-            event_store (str): Optional. Name of the event store.
-              Default: 'default_event_store'
-            placement_id (str): Required. ID of the recommendation engine
-              placement. This id is used to identify the set of models that
-              will be used to make the prediction.
-        """
-    self.project = project
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.placement_id = placement_id
-    self.catalog_name = catalog_name
-    self.event_store = event_store
-    if placement_id is None:
-      raise ValueError('placement_id must be specified')
-    else:
-      self.placement_id = placement_id
-
-  def expand(self, pcoll):
-    if self.project is None:
-      self.project = pcoll.pipeline.options.view_as(GoogleCloudOptions).project
-    if self.project is None:
-      raise ValueError(
-          'GCP project name needs to be specified in "project" pipeline option')
-    return pcoll | ParDo(
-        _PredictUserEventFn(
-            self.project,
-            self.retry,
-            self.timeout,
-            self.metadata,
-            self.catalog_name,
-            self.event_store,
-            self.placement_id))
-
-
-class _PredictUserEventFn(DoFn):
-  FAILED_PREDICTIONS = "failed_predictions"
-
-  def __init__(
-      self,
-      project=None,
-      retry=None,
-      timeout=120,
-      metadata=None,
-      catalog_name=None,
-      event_store=None,
-      placement_id=None):
-    self._client = None
-    self.retry = retry
-    self.timeout = timeout
-    self.metadata = metadata
-    self.name = f"projects/{project}/locations/global/catalogs/"\
-                f"{catalog_name}/eventStores/{event_store}/placements/"\
-                f"{placement_id}"
-    self.counter = Metrics.counter(self.__class__, "api_calls")
-
-  def setup(self):
-    if self._client is None:
-      self._client = get_recommendation_prediction_client()
-
-  def process(self, element):
-    user_event = recommendationengine.UserEvent(element)
-    request = recommendationengine.PredictRequest(
-        name=self.name, user_event=user_event)
-
-    try:
-      prediction = self._client.predict(request)
-      self.counter.inc()
-      yield [
-          recommendationengine.PredictResponse.to_dict(p)
-          for p in prediction.pages
-      ]
-    except Exception:
-      yield pvalue.TaggedOutput(self.FAILED_PREDICTIONS, user_event)
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py
deleted file mode 100644
index 2f688d9..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test.py
+++ /dev/null
@@ -1,207 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for Recommendations AI transforms."""
-
-from __future__ import absolute_import
-
-import unittest
-
-import mock
-
-import apache_beam as beam
-from apache_beam.metrics import MetricsFilter
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-  from apache_beam.ml.gcp import recommendations_ai
-except ImportError:
-  recommendationengine = None
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAICatalogItemTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.create_catalog_item.return_value = (
-        recommendationengine.CatalogItem())
-    self.m2 = mock.Mock()
-    self.m2.result.return_value = None
-    self._mock_client.import_catalog_items.return_value = self.m2
-
-    self._catalog_item = {
-        "id": "12345",
-        "title": "Sample laptop",
-        "description": "Indisputably the most fantastic laptop ever created.",
-        "language_code": "en",
-        "category_hierarchies": [{
-            "categories": ["Electronic", "Computers"]
-        }]
-    }
-
-  def test_CreateCatalogItem(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_catalog_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._catalog_item])
-          | "Create CatalogItem" >>
-          recommendations_ai.CreateCatalogItem(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-  def test_ImportCatalogItems(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_catalog_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([
-              (self._catalog_item["id"], self._catalog_item),
-              (self._catalog_item["id"], self._catalog_item)
-          ]) | "Create CatalogItems" >>
-          recommendations_ai.ImportCatalogItems(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAIUserEventTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.write_user_event.return_value = (
-        recommendationengine.UserEvent())
-    self.m2 = mock.Mock()
-    self.m2.result.return_value = None
-    self._mock_client.import_user_events.return_value = self.m2
-
-    self._user_event = {
-        "event_type": "page-visit", "user_info": {
-            "visitor_id": "1"
-        }
-    }
-
-  def test_CreateUserEvent(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_user_event_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._user_event])
-          | "Create UserEvent" >>
-          recommendations_ai.WriteUserEvent(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-  def test_ImportUserEvents(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_user_event_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([
-              (self._user_event["user_info"]["visitor_id"], self._user_event),
-              (self._user_event["user_info"]["visitor_id"], self._user_event)
-          ]) | "Create UserEvents" >>
-          recommendations_ai.ImportUserEvents(project="test"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationsAIPredictTest(unittest.TestCase):
-  def setUp(self):
-    self._mock_client = mock.Mock()
-    self._mock_client.predict.return_value = [
-        recommendationengine.PredictResponse()
-    ]
-
-    self._user_event = {
-        "event_type": "page-visit", "user_info": {
-            "visitor_id": "1"
-        }
-    }
-
-  def test_Predict(self):
-    expected_counter = 1
-    with mock.patch.object(recommendations_ai,
-                           'get_recommendation_prediction_client',
-                           return_value=self._mock_client):
-      p = beam.Pipeline()
-
-      _ = (
-          p | "Create data" >> beam.Create([self._user_event])
-          | "Prediction UserEvents" >> recommendations_ai.PredictUserEvent(
-              project="test", placement_id="recently_viewed_default"))
-
-      result = p.run()
-      result.wait_until_finish()
-
-      read_filter = MetricsFilter().with_name('api_calls')
-      query_result = result.metrics().query(read_filter)
-      if query_result['counters']:
-        read_counter = query_result['counters'][0]
-        self.assertTrue(read_counter.result == expected_counter)
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
deleted file mode 100644
index 19e6b9e..0000000
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
+++ /dev/null
@@ -1,107 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Integration tests for Recommendations AI transforms."""
-
-from __future__ import absolute_import
-
-import random
-import unittest
-
-from nose.plugins.attrib import attr
-
-import apache_beam as beam
-from apache_beam.testing.test_pipeline import TestPipeline
-from apache_beam.testing.util import assert_that
-from apache_beam.testing.util import equal_to
-from apache_beam.testing.util import is_not_empty
-
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:
-  from google.cloud import recommendationengine
-  from apache_beam.ml.gcp import recommendations_ai
-except ImportError:
-  recommendationengine = None
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
-
-def extract_id(response):
-  yield response["id"]
-
-
-def extract_event_type(response):
-  yield response["event_type"]
-
-
-def extract_prediction(response):
-  yield response[0]["results"]
-
-
-@attr('IT')
-@unittest.skipIf(
-    recommendationengine is None,
-    "Recommendations AI dependencies not installed.")
-class RecommendationAIIT(unittest.TestCase):
-  def test_create_catalog_item(self):
-
-    CATALOG_ITEM = {
-        "id": str(int(random.randrange(100000))),
-        "title": "Sample laptop",
-        "description": "Indisputably the most fantastic laptop ever created.",
-        "language_code": "en",
-        "category_hierarchies": [{
-            "categories": ["Electronic", "Computers"]
-        }]
-    }
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([CATALOG_ITEM])
-          | 'Create CatalogItem' >>
-          recommendations_ai.CreateCatalogItem(project=p.get_option('project'))
-          | beam.ParDo(extract_id) | beam.combiners.ToList())
-
-      assert_that(output, equal_to([[CATALOG_ITEM["id"]]]))
-
-  def test_create_user_event(self):
-    USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}}
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([USER_EVENT]) | 'Create UserEvent' >>
-          recommendations_ai.WriteUserEvent(project=p.get_option('project'))
-          | beam.ParDo(extract_event_type) | beam.combiners.ToList())
-
-      assert_that(output, equal_to([[USER_EVENT["event_type"]]]))
-
-  def test_create_predict(self):
-    USER_EVENT = {"event_type": "page-visit", "user_info": {"visitor_id": "1"}}
-
-    with TestPipeline(is_integration_test=True) as p:
-      output = (
-          p | 'Create data' >> beam.Create([USER_EVENT])
-          | 'Predict UserEvent' >> recommendations_ai.PredictUserEvent(
-              project=p.get_option('project'),
-              placement_id="recently_viewed_default")
-          | beam.ParDo(extract_prediction))
-
-      assert_that(output, is_not_empty())
-
-
-if __name__ == '__main__':
-  print(recommendationengine.CatalogItem.__module__)
-  unittest.main()
diff --git a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
index f6d3340..68139cf 100644
--- a/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
+++ b/sdks/python/apache_beam/testing/load_tests/load_test_metrics_utils.py
@@ -48,7 +48,7 @@
 from apache_beam.utils.timestamp import Timestamp
 
 try:
-  from google.cloud import bigquery  # type: ignore
+  from google.cloud import bigquery
   from google.cloud.bigquery.schema import SchemaField
   from google.cloud.exceptions import NotFound
 except ImportError:
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 7117a55..775f321 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -200,7 +200,6 @@
     'google-cloud-language>=1.3.0,<2',
     'google-cloud-videointelligence>=1.8.0,<2',
     'google-cloud-vision>=0.38.0,<2',
-    'google-cloud-recommendations-ai>=0.1.0,<=0.2.0'
 ]
 
 INTERACTIVE_BEAM = [
diff --git a/website/www/site/assets/scss/_global.sass b/website/www/site/assets/scss/_global.sass
index 773b404..f7f7b5a 100644
--- a/website/www/site/assets/scss/_global.sass
+++ b/website/www/site/assets/scss/_global.sass
@@ -115,7 +115,7 @@
   line-height: 1.63
   letter-spacing: 0.43px
   padding: 24px
-  max-width: 848px
+  max-width: 100%
   position: relative
 
   pre
@@ -133,7 +133,7 @@
         cursor: pointer
 
 .snippet
-  max-width: 848px
+  max-width: 100%
   margin-bottom: 40px
   .git-link
     float: right
diff --git a/website/www/site/content/en/documentation/_index.md b/website/www/site/content/en/documentation/_index.md
index 36812d5..80d111b 100644
--- a/website/www/site/content/en/documentation/_index.md
+++ b/website/www/site/content/en/documentation/_index.md
@@ -30,6 +30,7 @@
 * Read the [Programming Guide](/documentation/programming-guide/), which introduces all the key Beam concepts.
 * Learn about Beam's [execution model](/documentation/runtime/model) to better understand how pipelines execute.
 * Visit [Learning Resources](/documentation/resources/learning-resources) for some of our favorite articles and talks about Beam.
+* Visit the [glossary](/documentation/glossary) to learn the terminology of the Beam programming model.
 
 ## Pipeline Fundamentals
 
diff --git a/website/www/site/content/en/documentation/glossary.md b/website/www/site/content/en/documentation/glossary.md
new file mode 100644
index 0000000..3993f53
--- /dev/null
+++ b/website/www/site/content/en/documentation/glossary.md
@@ -0,0 +1,464 @@
+---
+title: "Beam glossary"
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+# Apache Beam glossary
+
+## Aggregation
+
+A transform pattern for computing a value from multiple input elements. Aggregation is similar to the reduce operation in the [MapReduce](https://en.wikipedia.org/wiki/MapReduce) model. Aggregation transforms include Count (computes the count of all elements in the aggregation), Max (computes the maximum element in the aggregation), and Sum (computes the sum of all elements in the aggregation).
+
+For a complete list of aggregation transforms, see:
+
+* [Java Transform catalog](/documentation/transforms/java/overview/#aggregation)
+* [Python Transform catalog](/documentation/transforms/python/overview/#aggregation)
+
+## Apply
+
+A method for invoking a transform on a PCollection. Each transform in the Beam SDKs has a generic `apply` method (or pipe operator `|`). Invoking multiple Beam transforms is similar to method chaining, but with a difference: You apply the transform to the input PCollection, passing the transform itself as an argument, and the operation returns the output PCollection. Because of Beam’s deferred execution model, applying a transform does not immediately execute that transform.
+
+To learn more, see:
+
+* [Applying transforms](/documentation/programming-guide/#applying-transforms)
+
+## Batch processing
+
+A data processing paradigm for working with finite, or bounded, datasets. A bounded PCollection represents a dataset of a known, fixed size. Reading from a batch data source, such as a file or a database, creates a bounded PCollection. A batch processing job eventually ends, in contrast to a streaming job, which runs until cancelled.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+
+## Bounded data
+
+A dataset of a known, fixed size. A PCollection can be bounded or unbounded, depending on the source of the data that it represents. Reading from a batch data source, such as a file or a database, creates a bounded PCollection. Beam also supports reading a bounded amount of data from an unbounded source.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+
+## Bundle
+
+The processing unit for elements in a PCollection. Instead of processing all elements in a PCollection simultaneously, Beam processes the elements in bundles. The runner handles the division of the collection into bundles, and in doing so it may optimize the bundle size for the use case. For example, a streaming runner might process smaller bundles than a batch runner.
+
+To learn more, see:
+
+* [Bundling and persistence](/documentation/runtime/model/#bundling-and-persistence)
+
+## Coder
+
+A component that describes how the elements of a PCollection can be encoded and decoded. To support distributed processing and cross-language portability, Beam needs to be able to encode each element of a PCollection as bytes. The Beam SDKs provide built-in coders for common types and language-specific mechanisms for specifying the encoding of a PCollection.
+
+To learn more, see:
+
+* [Data encoding and type safety](/documentation/programming-guide/#data-encoding-and-type-safety)
+
+## CoGroupByKey
+
+A PTransform that takes two or more PCollections and aggregates the elements by key. In effect, CoGroupByKey performs a relational join of two or more key/value PCollections that have the same key type. While GroupByKey performs this operation over a single input collection, CoGroupByKey operates over multiple input collections.
+
+To learn more, see:
+
+* [CoGroupByKey](/documentation/programming-guide/#cogroupbykey)
+* [CoGroupByKey (Java)](/documentation/transforms/java/aggregation/cogroupbykey/)
+* [CoGroupByKey (Python)](/documentation/transforms/python/aggregation/cogroupbykey/)
+
+## Collection
+
+See [PCollection](/documentation/glossary/#pcollection).
+
+## Combine
+
+A PTransform for combining all elements of a PCollection or all values associated with a key. When you apply a Combine transform, you have to provide a user-defined function (UDF) that contains the logic for combining the elements or values. The combining function should be [commutative](https://en.wikipedia.org/wiki/Commutative_property) and [associative](https://en.wikipedia.org/wiki/Associative_property), because the function is not necessarily invoked exactly once on all values with a given key.
+
+To learn more, see:
+
+* [Combine](/documentation/programming-guide/#combine)
+* [Combine (Java)](/documentation/transforms/java/aggregation/combine/)
+* [CombineGlobally (Python)](/documentation/transforms/python/aggregation/combineglobally/)
+* [CombinePerKey (Python)](/documentation/transforms/python/aggregation/combineperkey/)
+* [CombineValues (Python)](/documentation/transforms/python/aggregation/combinevalues/)
+
+## Composite transform
+
+A PTransform that expands into many PTransforms. Composite transforms have a nested structure, in which a complex transform applies one or more simpler transforms. These simpler transforms could be existing Beam operations like ParDo, Combine, or GroupByKey, or they could be other composite transforms. Nesting multiple transforms inside a single composite transform can make your pipeline more modular and easier to understand.
+
+To learn more, see:
+
+* [Composite transforms](/documentation/programming-guide/#composite-transforms)
+
+## Counter (metric)
+
+A metric that reports a single long value and can be incremented. In the Beam model, metrics provide insight into the state of a pipeline, potentially while the pipeline is running.
+
+To learn more, see:
+
+* [Types of metrics](/documentation/programming-guide/#types-of-metrics)
+
+## Cross-language transforms
+
+Transforms that can be shared across Beam SDKs. With cross-language transforms, you can use transforms written in any supported SDK language (currently, Java and Python) in a pipeline written in a different SDK language. For example, you could use the Apache Kafka connector from the Java SDK in a Python streaming pipeline. Cross-language transforms make it possible to provide new functionality simultaneously in different SDKs.
+
+To learn more, see:
+
+* [Multi-language pipelines](/documentation/programming-guide/#mulit-language-pipelines)
+
+## Deferred execution
+
+A feature of the Beam execution model. Beam operations are deferred, meaning that the result of a given operation may not be available for control flow. Deferred execution allows the Beam API to support parallel processing of data.
+
+## Distribution (metric)
+
+A metric that reports information about the distribution of reported values. In the Beam model, metrics provide insight into the state of a pipeline, potentially while the pipeline is running.
+
+To learn more, see:
+
+* [Types of metrics](/documentation/programming-guide/#types-of-metrics)
+
+## DoFn
+
+A function object used by ParDo (or some other transform) to process the elements of a PCollection. A DoFn is a user-defined function, meaning that it contains custom code that defines a data processing task in your pipeline. The Beam system invokes a DoFn one or more times to process some arbitrary bundle of elements, but Beam doesn’t guarantee an exact number of invocations.
+
+To learn more, see:
+
+* [ParDo](/documentation/programming-guide/#pardo)
+
+## Driver
+
+A program that defines your pipeline, including all of the inputs, transforms, and outputs. To use Beam, you need to create a driver program using classes from one of the Beam SDKs. The driver program creates a pipeline and specifies the execution options that tell the pipeline where and how to run. These options include the runner, which determines what backend your pipeline will run on.
+
+To learn more, see:
+
+* [Overview](/documentation/programming-guide/#overview)
+
+## Element
+
+The unit of data in a PCollection. Elements in a PCollection can be of any type, but they must all have the same type. This allows parallel computations to operate uniformly across the entire collection. Some element types have a structure that can be introspected (for example, JSON, Protocol Buffer, Avro, and database records).
+
+To learn more, see:
+
+* [PCollection characteristics](/documentation/programming-guide/#pcollection-characteristics)
+
+## Element-wise
+
+A type of transform that independently processes each element in an input PCollection. Element-wise is similar to the map operation in the [MapReduce](https://en.wikipedia.org/wiki/MapReduce) model. An element-wise transform might output 0, 1, or multiple values for each input element. This is in contrast to aggregation transforms, which compute a single value from multiple input elements. Element-wise operations include Filter, FlatMap, and ParDo.
+
+For a complete list of element-wise transforms, see:
+
+* [Java Transform catalog](/documentation/transforms/java/overview/#element-wise)
+* [Python Transform catalog](/documentation/transforms/python/overview/#element-wise)
+
+## Engine
+
+A data-processing system, such as Dataflow, Spark, or Flink. A Beam runner for an engine executes a Beam pipeline on that engine.
+
+## Event time
+
+The time a data event occurs, determined by a timestamp on an element. This is in contrast to processing time, which is when an element is processed in a pipeline. An event could be, for example, a user interaction or a write to an error log. There’s no guarantee that events will appear in a pipeline in order of event time.
+
+To learn more, see:
+
+* [Watermarks and late data](/documentation/programming-guide/#watermarks-and-late-data)
+* [Triggers](/documentation/programming-guide/#triggers)
+
+## Expansion Service
+
+A service that enables a pipeline to apply (expand) cross-language transforms defined in other SDKs. For example, by connecting to a Java expansion service, the Python SDK can apply transforms implemented in Java. Currently SDKs define expansion services as local processes, but in the future Beam may support long-running expansion services. The development of expansion services is part of the ongoing effort to support multi-language pipelines.
+
+## Flatten
+One of the core PTransforms. Flatten merges multiple PCollections into a single logical PCollection.
+
+To learn more, see:
+
+* [Flatten](/documentation/programming-guide/#flatten)
+* [Flatten (Java)](/documentation/transforms/java/other/flatten/)
+* [Flatten (Python)](/documentation/transforms/python/other/flatten/)
+
+## Fusion
+
+An optimization that Beam runners can apply before running a pipeline. When one transform outputs a PCollection that’s consumed by another transform, or when two or more transforms take the same PCollection as input, a runner may be able to fuse the transforms together into a single processing unit (a *stage* in Dataflow). Fusion can make pipeline execution more efficient by preventing I/O operations.
+
+## Gauge (metric)
+
+A metric that reports the latest value out of reported values. In the Beam model, metrics provide insight into the state of a pipeline, potentially while the pipeline is running. Because metrics are collected from many workers, the gauge value may not be the absolute last value, but it will be one of the latest values produced by one of the workers.
+
+To learn more, see:
+
+* [Types of metrics](/documentation/programming-guide/#types-of-metrics)
+
+## GroupByKey
+
+A PTransform for processing collections of key/value pairs. GroupByKey is a parallel reduction operation, similar to the shuffle of a map/shuffle/reduce algorithm. The input to GroupByKey is a collection of key/value pairs in which multiple pairs have the same key but different values (i.e. a multimap). You can use GroupByKey to collect all of the values associated with each unique key.
+
+To learn more, see:
+
+* [GroupByKey](/documentation/programming-guide/#groupbykey)
+* [GroupByKey (Java)](/documentation/transforms/java/aggregation/groupbykey/)
+* [GroupByKey (Python)](/documentation/transforms/python/aggregation/groupbykey/)
+
+## I/O connector
+
+A set of PTransforms for working with external data storage systems. When you create a pipeline, you often need to read from or write to external data systems such as files or databases. Beam provides read and write transforms for a number of common data storage types.
+
+To learn more, see:
+
+* [Pipeline I/O](/documentation/programming-guide/#pipeline-io)
+* [Built-in I/O Transforms](/documentation/io/built-in/)
+
+## Map
+
+An element-wise PTransform that applies a user-defined function (UDF) to each element in a PCollection. Using Map, you can transform each individual element, but you can't change the number of elements.
+
+To learn more, see:
+
+* [Map (Python)](/documentation/transforms/python/elementwise/map/)
+* [MapElements (Java)](/documentation/transforms/java/elementwise/mapelements/)
+
+## Metrics
+
+Data on the state of a pipeline, potentially while the pipeline is running. You can use the built-in Beam metrics to gain insight into the functioning of your pipeline. For example, you might use Beam metrics to track errors, calls to a backend service, or the number of elements processed. Beam currently supports three types of metric: Counter, Distribution, and Gauge.
+
+To learn more, see:
+
+* [Metrics](/documentation/programming-guide/#metrics)
+
+## Multi-language pipeline
+
+A pipeline that uses cross-language transforms. You can combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline.
+
+To learn more, see:
+
+* [Multi-language pipelines](/documentation/programming-guide/#mulit-language-pipelines)
+
+## ParDo
+
+The lowest-level element-wise PTransform. For each element in an input PCollection, ParDo applies a function and emits zero, one, or multiple elements to an output PCollection. “ParDo” is short for “Parallel Do.” It’s similar to the map operation in a [MapReduce](https://en.wikipedia.org/wiki/MapReduce) algorithm, the `apply` method from a DataFrame, or the `UPDATE` keyword from SQL.
+
+To learn more, see:
+
+* [ParDo](/documentation/programming-guide/#pardo)
+* [ParDo (Java)](/documentation/transforms/java/elementwise/pardo/)
+* [ParDo (Python)](/documentation/transforms/python/elementwise/pardo/)
+
+## Partition
+
+An element-wise PTransform that splits a single PCollection into a fixed number of smaller PCollections. Partition requires a user-defined function (UDF) to determine how to split up the elements of the input collection into the resulting output collections. The number of partitions must be determined at graph construction time, meaning that you can’t determine the number of partitions using data calculated by the running pipeline.
+
+To learn more, see:
+
+* [Partition](/documentation/programming-guide/#partition)
+* [Partition (Java)](/documentation/transforms/java/elementwise/partition/)
+* [Partition (Python)](/documentation/transforms/python/elementwise/partition/)
+
+## PCollection
+
+A potentially distributed, homogeneous dataset or data stream. PCollections represent data in a Beam pipeline, and Beam transforms (PTransforms) use PCollection objects as inputs and outputs. PCollections are intended to be immutable, meaning that once a PCollection is created, you can’t add, remove, or change individual elements. The “P” stands for “parallel.”
+
+To learn more, see:
+
+* [PCollections](/documentation/programming-guide/#pcollections)
+
+## Pipe operator (`|`)
+
+Delimits a step in a Python pipeline. For example: `[Final Output PCollection] = ([Initial Input PCollection] | [First Transform] | [Second Transform] | [Third Transform])`. The output of each transform is passed from left to right as input to the next transform. The pipe operator in Python is equivalent to the `apply` method in Java (in other words, the pipe applies a transform to a PCollection).
+
+To learn more, see:
+
+* [Applying transforms](/documentation/programming-guide/#applying-transforms)
+
+## Pipeline
+
+An encapsulation of your entire data processing task, including reading input data from a source, transforming that data, and writing output data to a sink. You can think of a pipeline as a Beam program that uses PTransforms to process PCollections. The transforms in a pipeline can be represented as a directed acyclic graph (DAG). All Beam driver programs must create a pipeline.
+
+To learn more, see:
+
+* [Overview](/documentation/programming-guide/#overview)
+* [Creating a pipeline](/documentation/programming-guide/#creating-a-pipeline)
+* [Design your pipeline](/documentation/pipelines/design-your-pipeline/)
+* [Create your pipeline](/documentation/pipelines/create-your-pipeline/)
+
+## Processing time
+
+The time at which an element is processed at some stage in a pipeline. Processing time is not the same as event time, which is the time at which a data event occurs. Processing time is determined by the clock on the system processing the element. There’s no guarantee that elements will be processed in order of event time.
+
+To learn more, see:
+
+* [Watermarks and late data](/documentation/programming-guide/#watermarks-and-late-data)
+* [Triggers](/documentation/programming-guide/#triggers)
+
+## PTransform
+
+A data processing operation, or a step, in your pipeline. A PTransform takes zero or more PCollections as input, applies a processing function to the elements of that PCollection, and produces zero or more output PCollections. Some PTransforms accept user-defined functions that apply custom logic. The “P” stands for “parallel.”
+
+To learn more, see:
+
+* [Overview](/documentation/programming-guide/#overview)
+* [Transforms](/documentation/programming-guide/#transforms)
+
+## Runner
+
+A runner runs a pipeline on a specific platform. Most runners are translators or adapters to massively parallel big data processing systems. Other runners exist for local testing and debugging. Among the supported runners are Google Cloud Dataflow, Apache Spark, Apache Samza, Apache Flink, the Interactive Runner, and the Direct Runner.
+
+To learn more, see:
+
+* [Choosing a Runner](/documentation/#choosing-a-runner)
+* [Beam Capability Matrix](/documentation/runners/capability-matrix/)
+
+## Schema
+
+A language-independent type definition for a PCollection. The schema for a PCollection defines elements of that PCollection as an ordered list of named fields. Each field has a name, a type, and possibly a set of user options. Schemas provide a way to reason about types across different programming-language APIs.
+
+To learn more, see:
+
+* [Schemas](/documentation/programming-guide/#schemas)
+* [Schema Patterns](/documentation/patterns/schema/)
+
+## Session
+
+A time interval for grouping data events. A session is defined by some minimum gap duration between events. For example, a data stream representing user mouse activity may have periods with high concentrations of clicks followed by periods of inactivity. A session can represent such a pattern of activity followed by inactivity.
+
+To learn more, see:
+
+* [Session windows](/documentation/programming-guide/#session-windows)
+* [Analyzing Usage Patterns](/get-started/mobile-gaming-example/#analyzing-usage-patterns)
+
+## Side input
+
+Additional input to a PTransform. Side input is input that you provide in addition to the main input PCollection. A DoFn can access side input each time it processes an element in the PCollection. Side inputs are useful if your transform needs to inject additional data at runtime.
+
+To learn more, see:
+
+* [Side inputs](/documentation/programming-guide/#side-inputs)
+* [Side input patterns](/documentation/patterns/side-inputs/)
+
+## Sink
+
+A transform that writes to an external data storage system, like a file or database.
+
+To learn more, see:
+
+* [Developing new I/O connectors](/documentation/io/developing-io-overview/)
+* [Pipeline I/O](/documentation/programming-guide/#pipeline-io)
+* [Built-in I/O transforms](/documentation/io/built-in/)
+
+## Source
+A transform that reads from an external storage system. A pipeline typically reads input data from a source. The source has a type, which may be different from the sink type, so you can change the format of data as it moves through your pipeline.
+
+To learn more, see:
+
+* [Developing new I/O connectors](/documentation/io/developing-io-overview/)
+* [Pipeline I/O](/documentation/programming-guide/#pipeline-io)
+* [Built-in I/O transforms](/documentation/io/built-in/)
+
+## Splittable DoFn
+
+A generalization of DoFn that makes it easier to create complex, modular I/O connectors. A Splittable DoFn (SDF) can process elements in a non-monolithic way, meaning that the processing can be decomposed into smaller tasks. With SDF, you can check-point the processing of an element, and you can split the remaining work to yield additional parallelism. SDF is recommended for building new I/O connectors.
+
+To learn more, see:
+
+* [Splittable DoFns](/documentation/programming-guide/#splittable-dofns)
+* [Splittable DoFn in Apache Beam is Ready to Use](/blog/splittable-do-fn-is-available/)
+
+## State
+
+Persistent values that a PTransform can access. The state API lets you augment element-wise operations (for example, ParDo or Map) with mutable state. Using the state API, you can read from, and write to, state as you process each element of a PCollection. You can use the state API together with the timer API to create processing tasks that give you fine-grained control over the workflow.
+
+To learn more, see:
+
+* [State and Timers](/documentation/programming-guide/#state-and-timers)
+* [Stateful processing with Apache Beam](/blog/stateful-processing/)
+
+## Streaming
+
+A data processing paradigm for working with infinite, or unbounded, datasets. Reading from a streaming data source, such as Pub/Sub or Kafka, creates an unbounded PCollection. An unbounded PCollection must be processed using a job that runs continuously, because the entire collection can never be available for processing at any one time.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+* [Python Streaming Pipelines](/documentation/sdks/python-streaming/)
+
+# Timer
+
+A Beam feature that enables delayed processing of data stored using the state API. The timer API lets you set timers to call back at either an event-time or a processing-time timestamp. You can use the timer API together with the state API to create processing tasks that give you fine-grained control over the workflow.
+
+To learn more, see:
+
+* [State and Timers](/documentation/programming-guide/#state-and-timers)
+* [Stateful processing with Apache Beam](/blog/stateful-processing/)
+* [Timely (and Stateful) Processing with Apache Beam](/blog/timely-processing/)
+
+## Timestamp
+
+A point in time associated with an element in a PCollection and used to assign a window to the element. The source that creates the PCollection assigns each element an initial timestamp, often corresponding to when the element was read or added. But you can also manually assign timestamps. This can be useful if elements have an inherent timestamp, but the timestamp is somewhere in the structure of the element itself (for example, a time field in a server log entry).
+
+To learn more, see:
+
+* [Element timestamps](/documentation/programming-guide/#element-timestamps)
+* [Adding timestamps to a PCollection’s elements](/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements)
+
+## Transform
+
+See PTransform.
+
+## Trigger
+
+Determines when to emit aggregated result data from a window. You can use triggers to refine the windowing strategy for your pipeline. If you use the default windowing configuration and default trigger, Beam outputs an aggregated result when it estimates that all data for a window has arrived, and it discards all subsequent data for that window. But you can also use triggers to emit early results, before all the data in a given window has arrived, or to process late data by triggering after the event time watermark passes the end of the window.
+
+To learn more, see:
+
+* [Triggers](/documentation/programming-guide/#triggers)
+
+## Unbounded data
+
+A dataset of unlimited size. A PCollection can be bounded or unbounded, depending on the source of the data that it represents. Reading from a streaming or continuously-updating data source, such as Pub/Sub or Kafka, typically creates an unbounded PCollection.
+
+To learn more, see:
+
+* [Size and boundedness](/documentation/programming-guide/#size-and-boundedness)
+
+## User-defined function
+
+Custom logic that a PTransform applies to your data. Some PTransforms accept a user-defined function (UDF) as a way to configure the transform. For example, ParDo expects user code in the form of a DoFn object. Each language SDK has its own idiomatic way of expressing user-defined functions, but there are some common requirements, like serializability and thread compatibility.
+
+To learn more, see:
+
+* [User-Defined Functions (UDFs)](/documentation/basics/#user-defined-functions-udfs)
+* [ParDo](/documentation/programming-guide/#pardo)
+* [Requirements for writing user code for Beam transforms](/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms)
+
+## Watermark
+
+The point in event time when all data in a window can be expected to have arrived in the pipeline. Watermarks provide a way to estimate the completeness of input data. Every PCollection has an associated watermark. Once the watermark progresses past the end of a window, any element that arrives with a timestamp in that window is considered late data.
+
+To learn more, see:
+
+* [Watermarks and late data](/documentation/programming-guide/#watermarks-and-late-data)
+
+## Windowing
+
+Partitioning a PCollection into bounded subsets grouped by the timestamps of individual elements. In the Beam model, any PCollection – including unbounded PCollections – can be subdivided into logical windows. Each element in a PCollection is assigned to one or more windows according to the PCollection's windowing function, and each individual window contains a finite number of elements. Transforms that aggregate multiple elements, such as GroupByKey and Combine, work implicitly on a per-window basis.
+
+To learn more, see:
+
+* [Windowing](/documentation/programming-guide/#windowing)
+
+## Worker
+
+A container, process, or virtual machine (VM) that handles some part of the parallel processing of a pipeline. The Beam model doesn’t support synchronizing shared state across worker machines. Instead, each worker node has its own independent copy of state. A Beam runner might serialize elements between machines for communication purposes and for other reasons such as persistence.
+
+To learn more, see:
+
+* [Execution model](/documentation/runtime/model/)
diff --git a/website/www/site/content/en/get-started/downloads.md b/website/www/site/content/en/get-started/downloads.md
index c67343d..0026b72 100644
--- a/website/www/site/content/en/get-started/downloads.md
+++ b/website/www/site/content/en/get-started/downloads.md
@@ -88,7 +88,7 @@
 
 ## Releases
 
-### 2.29.0 (2021-04-15)
+### 2.29.0 (2021-04-27)
 Official [source code download](http://www.apache.org/dyn/closer.cgi/beam/2.29.0/apache-beam-2.29.0-source-release.zip).
 [SHA-512](https://downloads.apache.org/beam/2.29.0/apache-beam-2.29.0-source-release.zip.sha512).
 [signature](https://downloads.apache.org/beam/2.29.0/apache-beam-2.29.0-source-release.zip.asc).
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index 0718fc5..94aa1d1 100644
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -342,6 +342,8 @@
   </ul>
 </li>
 
+<li><a href="/documentation/glossary/">Glossary</a></li>
+
 <li class="section-nav-item--collapsible">
   <span class="section-nav-list-title">Runtime systems</span>