Merge pull request #12376: [BEAM-10575] Eliminate legacy rawtypes from GCP IOs and some others; enable -Wrawtypes -Werror

diff --git a/CHANGES.md b/CHANGES.md
index 1e3b32e..c2665e7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -79,7 +79,7 @@
 
 * Fixed X (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 
-# [2.23.0] - Unreleased
+# [2.23.0] - 2020-06-29
 
 ## Highlights
 
@@ -88,7 +88,6 @@
 
 ## I/Os
 
-* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 * Support for reading from Snowflake added (Java) ([BEAM-9722](https://issues.apache.org/jira/browse/BEAM-9722)).
 * Support for writing to Splunk added (Java) ([BEAM-8596](https://issues.apache.org/jira/browse/BEAM-8596)).
 * Support for assume role added (Java) ([BEAM-10335](https://issues.apache.org/jira/browse/BEAM-10335)).
@@ -112,7 +111,6 @@
   overridden with `RowJson.RowJsonDeserializer#withNullBehavior`.
 * Fixed a bug in `GroupIntoBatches` experimental transform in Python to actually group batches by key. 
   This changes the output type for this transform ([BEAM-6696](https://issues.apache.org/jira/browse/BEAM-6696)).
-* X behavior was changed ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
 
 ## Deprecations
 
@@ -148,6 +146,7 @@
 * Upgrade Sphinx to 3.0.3 for building PyDoc.
 * Added a PTransform for image annotation using Google Cloud AI image processing service
 ([BEAM-9646](https://issues.apache.org/jira/browse/BEAM-9646))
+* Dataflow streaming timers are not strictly time ordered when set earlier mid-bundle ([BEAM-8543](https://issues.apache.org/jira/browse/BEAM-8543)).
 
 ## Breaking Changes
 
diff --git a/release/src/main/scripts/publish_docker_images.sh b/release/src/main/scripts/publish_docker_images.sh
index 7575107..dfa5d7d 100755
--- a/release/src/main/scripts/publish_docker_images.sh
+++ b/release/src/main/scripts/publish_docker_images.sh
@@ -27,7 +27,7 @@
 DOCKER_IMAGE_DEFAULT_REPO_ROOT=apache
 DOCKER_IMAGE_DEFAULT_REPO_PREFIX=beam_
 
-PYTHON_VER=("python2.7" "python3.5" "python3.6" "python3.7")
+PYTHON_VER=("python2.7" "python3.5" "python3.6" "python3.7" "python3.8")
 FLINK_VER=("1.8" "1.9" "1.10")
 
 echo "Publish SDK docker images to Docker Hub."
@@ -36,17 +36,17 @@
 echo "Which release version are you working on: "
 read RELEASE
 
-echo "================Setting Up RC candidate Variables==========="
-echo "From which RC candidate do you create publish docker image? (ex: rc0, rc1) "
-read RC_VERSION
+echo "Which release candidate will be the source of final docker images? (ex: 1)"
+read RC_NUM
+RC_VERSION="rc${RC_NUM}"
 
-echo "================Confirmimg Release and RC version==========="
-echo "We are using ${RC_VERSION} to create docker images for ${RELEASE}."
+echo "================Confirming Release and RC version==========="
+echo "We are using ${RC_VERSION} to push docker images for ${RELEASE}."
 echo "Do you want to proceed? [y|N]"
 read confirmation
 if [[ $confirmation = "y" ]]; then
 
-  echo '-------------------Generating and Pushing Python images-----------------'
+  echo '-------------------Tagging and Pushing Python images-----------------'
   for ver in "${PYTHON_VER[@]}"; do
     # Pull varified RC from dockerhub.
     docker pull ${DOCKER_IMAGE_DEFAULT_REPO_ROOT}/${DOCKER_IMAGE_DEFAULT_REPO_PREFIX}${ver}_sdk:${RELEASE}_${RC_VERSION}
@@ -65,7 +65,7 @@
     docker rmi -f ${DOCKER_IMAGE_DEFAULT_REPO_ROOT}/${DOCKER_IMAGE_DEFAULT_REPO_PREFIX}${ver}_sdk:latest
   done
 
-  echo '-------------------Generating and Pushing Java images-----------------'
+  echo '-------------------Tagging and Pushing Java images-----------------'
   # Pull varified RC from dockerhub.
   docker pull ${DOCKER_IMAGE_DEFAULT_REPO_ROOT}/${DOCKER_IMAGE_DEFAULT_REPO_PREFIX}java_sdk:${RELEASE}_${RC_VERSION}
 
@@ -82,7 +82,7 @@
   docker rmi -f ${DOCKER_IMAGE_DEFAULT_REPO_ROOT}/${DOCKER_IMAGE_DEFAULT_REPO_PREFIX}java_sdk:${RELEASE}
   docker rmi -f ${DOCKER_IMAGE_DEFAULT_REPO_ROOT}/${DOCKER_IMAGE_DEFAULT_REPO_PREFIX}java_sdk:latest
 
-  echo '-------------Generating and Pushing Flink job server images-------------'
+  echo '-------------Tagging and Pushing Flink job server images-------------'
   echo "Publishing images for the following Flink versions:" "${FLINK_VER[@]}"
   echo "Make sure the versions are correct, then press any key to proceed."
   read
@@ -106,7 +106,7 @@
     docker rmi -f "${FLINK_IMAGE_NAME}:latest"
   done
 
-  echo '-------------Generating and Pushing Spark job server image-------------'
+  echo '-------------Tagging and Pushing Spark job server image-------------'
   SPARK_IMAGE_NAME=${DOCKER_IMAGE_DEFAULT_REPO_ROOT}/${DOCKER_IMAGE_DEFAULT_REPO_PREFIX}spark_job_server
 
   # Pull verified RC from dockerhub.
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index 7ed390b..334f145 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -24,10 +24,12 @@
 import com.google.api.services.dataflow.model.SideInputInfo;
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
@@ -519,7 +521,7 @@
               synchronizedProcessingTime);
 
       this.cachedFiredTimers = null;
-      this.cachedFiredUserTimers = null;
+      this.toBeFiredTimersOrdered = null;
     }
 
     public void flushState() {
@@ -559,28 +561,67 @@
       return nextTimer;
     }
 
-    // Lazily initialized
-    private Iterator<TimerData> cachedFiredUserTimers = null;
+    private PriorityQueue<TimerData> toBeFiredTimersOrdered = null;
+
+    // to track if timer is reset earlier mid-bundle.
+    // Map of timer's id to timer's firing time to check
+    // the actual firing time of a timer.
+    private Map<String, Instant> firedTimer = new HashMap<>();
 
     public <W extends BoundedWindow> TimerData getNextFiredUserTimer(Coder<W> windowCoder) {
-      if (cachedFiredUserTimers == null) {
-        cachedFiredUserTimers =
-            FluentIterable.<Timer>from(StreamingModeExecutionContext.this.getFiredTimers())
-                .filter(
-                    timer ->
-                        WindmillTimerInternals.isUserTimer(timer)
-                            && timer.getStateFamily().equals(stateFamily))
-                .transform(
-                    timer ->
-                        WindmillTimerInternals.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
-                .iterator();
+      if (toBeFiredTimersOrdered == null) {
+
+        toBeFiredTimersOrdered = new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
+        FluentIterable.from(StreamingModeExecutionContext.this.getFiredTimers())
+            .filter(
+                timer ->
+                    WindmillTimerInternals.isUserTimer(timer)
+                        && timer.getStateFamily().equals(stateFamily))
+            .transform(
+                timer ->
+                    WindmillTimerInternals.windmillTimerToTimerData(
+                        WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, timer, windowCoder))
+            .iterator()
+            .forEachRemaining(
+                timerData -> {
+                  firedTimer.put(
+                      timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
+                      timerData.getTimestamp());
+                  toBeFiredTimersOrdered.add(timerData);
+                });
       }
 
-      if (!cachedFiredUserTimers.hasNext()) {
+      Instant currentInputWatermark = userTimerInternals.currentInputWatermarkTime();
+
+      if (userTimerInternals.hasTimerBefore(currentInputWatermark)) {
+        List<TimerData> currentTimers = userTimerInternals.getCurrentTimers();
+
+        for (TimerData timerData : currentTimers) {
+          firedTimer.put(
+              timerData.getTimerId() + '+' + timerData.getTimerFamilyId(),
+              timerData.getTimestamp());
+          toBeFiredTimersOrdered.add(timerData);
+        }
+      }
+
+      TimerData nextTimer = null;
+
+      // fire timer only if its timestamp matched. Else it is either reset or obsolete.
+      while (!toBeFiredTimersOrdered.isEmpty()) {
+        nextTimer = toBeFiredTimersOrdered.poll();
+        String timerUniqueId = nextTimer.getTimerId() + '+' + nextTimer.getTimerFamilyId();
+        if (firedTimer.containsKey(timerUniqueId)
+            && firedTimer.get(timerUniqueId).isEqual(nextTimer.getTimestamp())) {
+          break;
+        } else {
+          nextTimer = null;
+        }
+      }
+
+      if (nextTimer == null) {
         return null;
       }
-      TimerData nextTimer = cachedFiredUserTimers.next();
+
       // User timers must be explicitly deleted when delivered, to release the implied hold
       userTimerInternals.deleteTimer(nextTimer);
       return nextTimer;
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index f46fd49..5269cf2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -22,6 +22,8 @@
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.TimerInternals;
@@ -225,6 +227,29 @@
     timers.clear();
   }
 
+  public boolean hasTimerBefore(Instant time) {
+    for (Cell<String, StateNamespace, Boolean> cell : timerStillPresent.cellSet()) {
+      TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
+      if (cell.getValue()) {
+        if (timerData.getTimestamp().isBefore(time)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  public List<TimerData> getCurrentTimers() {
+    List<TimerData> timerDataList = new ArrayList<>();
+    for (Cell<String, StateNamespace, Boolean> cell : timerStillPresent.cellSet()) {
+      TimerData timerData = timers.get(cell.getRowKey(), cell.getColumnKey());
+      if (cell.getValue()) {
+        timerDataList.add(timerData);
+      }
+    }
+    return timerDataList;
+  }
+
   private boolean needsWatermarkHold(TimerData timerData) {
     // If it is a user timer or a system timer with outputTimestamp different than timestamp
     return WindmillNamespacePrefix.USER_NAMESPACE_PREFIX.equals(prefix)
diff --git a/sdks/go/pkg/beam/create.go b/sdks/go/pkg/beam/create.go
index b73583d..a0d2c0b 100644
--- a/sdks/go/pkg/beam/create.go
+++ b/sdks/go/pkg/beam/create.go
@@ -17,13 +17,12 @@
 
 import (
 	"bytes"
-	"fmt"
 	"reflect"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
-// Create inserts a fixed set of values into the pipeline. The values must
+// Create inserts a fixed non-empty set of values into the pipeline. The values must
 // be of the same type 'A' and the returned PCollection is of type A.
 //
 // The returned PCollections can be used as any other PCollections. The values
@@ -34,31 +33,52 @@
 }
 
 // CreateList inserts a fixed set of values into the pipeline from a slice or
-// array. It is a convenience wrapper over Create.
+// array. Unlike Create this supports the creation of an empty PCollection.
 func CreateList(s Scope, list interface{}) PCollection {
-	var ret []interface{}
+	return Must(TryCreateList(s, list))
+}
+
+// TryCreate inserts a fixed non-empty set of values into the pipeline. The
+// values must be of the same type.
+func TryCreate(s Scope, values ...interface{}) (PCollection, error) {
+	if len(values) == 0 {
+		err := errors.New("create has no values")
+		return PCollection{}, addCreateCtx(err, s)
+	}
+
+	t := reflect.ValueOf(values[0]).Type()
+	return createList(s, values, t)
+}
+
+// TryCreateList inserts a fixed set of values into the pipeline from a slice or
+// array. The values must be of the same type. Unlike TryCreate this supports
+// the creation of an empty PCollection.
+func TryCreateList(s Scope, list interface{}) (PCollection, error) {
 	val := reflect.ValueOf(list)
 	if val.Kind() != reflect.Slice && val.Kind() != reflect.Array {
-		panic(fmt.Sprintf("Input %v must be a slice or array", list))
+		err := errors.Errorf("input %v must be a slice or array", list)
+		return PCollection{}, addCreateCtx(err, s)
 	}
+
+	var ret []interface{}
 	for i := 0; i < val.Len(); i++ {
 		ret = append(ret, val.Index(i).Interface())
 	}
-	return Must(TryCreate(s, ret...))
+
+	var t reflect.Type
+	if len(ret) == 0 {
+		t = reflect.TypeOf(list).Elem()
+	} else {
+		t = reflect.ValueOf(ret[0]).Type()
+	}
+	return createList(s, ret, t)
 }
 
 func addCreateCtx(err error, s Scope) error {
 	return errors.WithContextf(err, "inserting Create in scope %s", s)
 }
 
-// TryCreate inserts a fixed set of values into the pipeline. The values must
-// be of the same type.
-func TryCreate(s Scope, values ...interface{}) (PCollection, error) {
-	if len(values) == 0 {
-		return PCollection{}, addCreateCtx(errors.New("create has no values"), s)
-	}
-
-	t := reflect.ValueOf(values[0]).Type()
+func createList(s Scope, values []interface{}, t reflect.Type) (PCollection, error) {
 	fn := &createFn{Type: EncodedType{T: t}}
 	enc := NewElementEncoder(t)
 
@@ -69,7 +89,8 @@
 		}
 		var buf bytes.Buffer
 		if err := enc.Encode(value, &buf); err != nil {
-			return PCollection{}, addCreateCtx(errors.Wrapf(err, "marshalling of %v failed", value), s)
+			err = errors.Wrapf(err, "marshalling of %v failed", value)
+			return PCollection{}, addCreateCtx(err, s)
 		}
 		fn.Values = append(fn.Values, buf.Bytes())
 	}
diff --git a/sdks/go/pkg/beam/create_test.go b/sdks/go/pkg/beam/create_test.go
index ab0bd25..b3a0dbb 100644
--- a/sdks/go/pkg/beam/create_test.go
+++ b/sdks/go/pkg/beam/create_test.go
@@ -87,6 +87,32 @@
 	}
 }
 
+func TestCreateEmptyList(t *testing.T) {
+	tests := []struct {
+		values interface{}
+	}{
+		{[]int{}},
+		{[]string{}},
+		{[]float32{}},
+		{[]float64{}},
+		{[]uint{}},
+		{[]bool{}},
+		{[]wc{}},
+		{[]*testProto{}}, // Test for BEAM-4401
+	}
+
+	for _, test := range tests {
+		p, s := beam.NewPipelineWithRoot()
+		c := beam.CreateList(s, test.values)
+
+		passert.Empty(s, c)
+
+		if err := ptest.Run(p); err != nil {
+			t.Errorf("beam.CreateList(%v) failed: %v", test.values, err)
+		}
+	}
+}
+
 type testProto struct {
 	// OneOfField is an interface-typed field and cannot be JSON-marshaled, but
 	// should be specially handled by Beam as a field of a proto.Message.
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 3f0d13c..776fd3e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -3959,7 +3959,7 @@
       }
 
       testEventTimeTimerOrderingWithInputPTransform(
-          now, numTestElements, builder.advanceWatermarkToInfinity());
+          now, numTestElements, builder.advanceWatermarkToInfinity(), IsBounded.BOUNDED);
     }
 
     /** A test makes sure that an event time timers are correctly ordered using Create transform. */
@@ -3970,7 +3970,7 @@
       UsesStatefulParDo.class,
       UsesStrictTimerOrdering.class
     })
-    public void testEventTimeTimerOrderingWithCreate() throws Exception {
+    public void testEventTimeTimerOrderingWithCreateBounded() throws Exception {
       final int numTestElements = 100;
       final Instant now = new Instant(1500000000000L);
 
@@ -3980,13 +3980,39 @@
       }
 
       testEventTimeTimerOrderingWithInputPTransform(
-          now, numTestElements, Create.timestamped(elements));
+          now, numTestElements, Create.timestamped(elements), IsBounded.BOUNDED);
+    }
+
+    /**
+     * A test makes sure that an event time timers are correctly ordered using Create transform
+     * unbounded.
+     */
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesTimersInParDo.class,
+      UsesStatefulParDo.class,
+      UsesUnboundedPCollections.class,
+      UsesStrictTimerOrdering.class
+    })
+    public void testEventTimeTimerOrderingWithCreateUnbounded() throws Exception {
+      final int numTestElements = 100;
+      final Instant now = new Instant(1500000000000L);
+
+      List<TimestampedValue<KV<String, String>>> elements = new ArrayList<>();
+      for (int i = 0; i < numTestElements; i++) {
+        elements.add(TimestampedValue.of(KV.of("dummy", "" + i), now.plus(i * 1000)));
+      }
+
+      testEventTimeTimerOrderingWithInputPTransform(
+          now, numTestElements, Create.timestamped(elements), IsBounded.UNBOUNDED);
     }
 
     private void testEventTimeTimerOrderingWithInputPTransform(
         Instant now,
         int numTestElements,
-        PTransform<PBegin, PCollection<KV<String, String>>> transform)
+        PTransform<PBegin, PCollection<KV<String, String>>> transform,
+        IsBounded isBounded)
         throws Exception {
 
       final String timerIdBagAppend = "append";
@@ -4070,7 +4096,8 @@
             }
           };
 
-      PCollection<String> output = pipeline.apply(transform).apply(ParDo.of(fn));
+      PCollection<String> output =
+          pipeline.apply(transform).setIsBoundedInternal(isBounded).apply(ParDo.of(fn));
       List<String> expected =
           IntStream.rangeClosed(0, numTestElements)
               .mapToObj(expandFn(numTestElements))
@@ -4154,16 +4181,25 @@
           TestStream.create(KvCoder.of(VoidCoder.of(), VoidCoder.of()))
               .addElements(KV.of(null, null))
               .advanceWatermarkToInfinity();
-      pipeline.apply(TwoTimerTest.of(now, end, input));
+      pipeline.apply(TwoTimerTest.of(now, end, input, IsBounded.BOUNDED));
       pipeline.run();
     }
 
     @Test
     @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class})
-    public void testTwoTimersSettingEachOtherWithCreateAsInput() {
+    public void testTwoTimersSettingEachOtherWithCreateAsInputBounded() {
       Instant now = new Instant(1500000000000L);
       Instant end = now.plus(100);
-      pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null))));
+      pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), IsBounded.BOUNDED));
+      pipeline.run();
+    }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesTimersInParDo.class, UsesStrictTimerOrdering.class})
+    public void testTwoTimersSettingEachOtherWithCreateAsInputUnbounded() {
+      Instant now = new Instant(1500000000000L);
+      Instant end = now.plus(100);
+      pipeline.apply(TwoTimerTest.of(now, end, Create.of(KV.of(null, null)), IsBounded.UNBOUNDED));
       pipeline.run();
     }
 
@@ -4337,18 +4373,26 @@
     private static class TwoTimerTest extends PTransform<PBegin, PDone> {
 
       private static PTransform<PBegin, PDone> of(
-          Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, Void>>> input) {
-        return new TwoTimerTest(start, end, input);
+          Instant start,
+          Instant end,
+          PTransform<PBegin, PCollection<KV<Void, Void>>> input,
+          IsBounded isBounded) {
+        return new TwoTimerTest(start, end, input, isBounded);
       }
 
       private final Instant start;
       private final Instant end;
+      private final IsBounded isBounded;
       private final transient PTransform<PBegin, PCollection<KV<Void, Void>>> inputPTransform;
 
       public TwoTimerTest(
-          Instant start, Instant end, PTransform<PBegin, PCollection<KV<Void, Void>>> input) {
+          Instant start,
+          Instant end,
+          PTransform<PBegin, PCollection<KV<Void, Void>>> input,
+          IsBounded isBounded) {
         this.start = start;
         this.end = end;
+        this.isBounded = isBounded;
         this.inputPTransform = input;
       }
 
@@ -4361,6 +4405,7 @@
         PCollection<String> result =
             input
                 .apply(inputPTransform)
+                .setIsBoundedInternal(isBounded)
                 .apply(
                     ParDo.of(
                         new DoFn<KV<Void, Void>, String>() {
@@ -4425,7 +4470,7 @@
                         }));
 
         List<String> expected =
-            LongStream.rangeClosed(0, 100)
+            LongStream.rangeClosed(0, end.minus(start.getMillis()).getMillis())
                 .mapToObj(e -> (Long) e)
                 .flatMap(e -> Arrays.asList("t1:" + e + ":" + e, "t2:" + e + ":" + e).stream())
                 .collect(Collectors.toList());
diff --git a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
index 6118cd0..334af11 100644
--- a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
+++ b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLQueryPlanner.java
@@ -45,7 +45,9 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.ChainedRelMetadataProvider;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.JaninoRelMetadataProvider;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterCalcMergeRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.JoinCommuteRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlNode;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlOperatorTable;
@@ -103,6 +105,14 @@
         //  requires the JoinCommuteRule, which doesn't work without struct flattening.
         if (rule instanceof JoinCommuteRule) {
           continue;
+        } else if (rule instanceof FilterCalcMergeRule || rule instanceof ProjectCalcMergeRule) {
+          // In order to support Java UDF, we need both BeamZetaSqlCalcRel and BeamCalcRel. It is
+          // because BeamZetaSqlCalcRel can execute ZetaSQL built-in functions while BeamCalcRel
+          // can execute UDFs. So during planning, we expect both Filter and Project are converted
+          // to Calc nodes before merging with other Project/Filter/Calc nodes. Thus we should not
+          // add FilterCalcMergeRule and ProjectCalcMergeRule. CalcMergeRule will achieve equivalent
+          // planning result eventually.
+          continue;
         } else if (rule instanceof BeamCalcRule) {
           bd.add(BeamZetaSqlCalcRule.INSTANCE);
         } else {
diff --git a/sdks/python/apache_beam/dataframe/doctests.py b/sdks/python/apache_beam/dataframe/doctests.py
index a88e020..2d135ea 100644
--- a/sdks/python/apache_beam/dataframe/doctests.py
+++ b/sdks/python/apache_beam/dataframe/doctests.py
@@ -43,6 +43,7 @@
 import contextlib
 import doctest
 import re
+import traceback
 from typing import Any
 from typing import Dict
 from typing import List
@@ -54,7 +55,7 @@
 from apache_beam.dataframe import expressions
 from apache_beam.dataframe import frames  # pylint: disable=unused-import
 from apache_beam.dataframe import transforms
-from apache_beam.dataframe.frame_base import DeferredFrame
+from apache_beam.dataframe.frame_base import DeferredBase
 
 
 class FakePandasObject(object):
@@ -66,10 +67,10 @@
 
   def __call__(self, *args, **kwargs):
     result = self._pandas_obj(*args, **kwargs)
-    if type(result) in DeferredFrame._pandas_type_map.keys():
+    if type(result) in DeferredBase._pandas_type_map.keys():
       placeholder = expressions.PlaceholderExpression(result[0:0])
       self._test_env._inputs[placeholder] = result
-      return DeferredFrame.wrap(placeholder)
+      return DeferredBase.wrap(placeholder)
     else:
       return result
 
@@ -111,7 +112,7 @@
         self._all_frames[id(df)] = df
 
       deferred_type.__init__ = new_init
-      deferred_type.__repr__ = lambda self: 'DeferredFrame[%s]' % id(self)
+      deferred_type.__repr__ = lambda self: 'DeferredBase[%s]' % id(self)
       self._recorded_results = collections.defaultdict(list)
       yield
     finally:
@@ -119,10 +120,10 @@
 
   @contextlib.contextmanager
   def context(self):
-    """Creates a context within which DeferredFrame types are monkey patched
+    """Creates a context within which DeferredBase types are monkey patched
     to record ids."""
     with contextlib.ExitStack() as stack:
-      for deferred_type in DeferredFrame._pandas_type_map.values():
+      for deferred_type in DeferredBase._pandas_type_map.values():
         stack.enter_context(self._monkey_patch_type(deferred_type))
       yield
 
@@ -164,7 +165,7 @@
 
 
 class _DeferrredDataframeOutputChecker(doctest.OutputChecker):
-  """Validates output by replacing DeferredFrame[...] with computed values.
+  """Validates output by replacing DeferredBase[...] with computed values.
   """
   def __init__(self, env, use_beam):
     self._env = env
@@ -202,22 +203,39 @@
           _ = output_pcoll | 'Record%s' % name >> beam.FlatMap(
               recorder.record_fn(name))
       # pipeline runs, side effects recorded
+
+      def concat(values):
+        if len(values) > 1:
+          return pd.concat(values)
+        else:
+          return values[0]
+
       return {
-          name: pd.concat(recorder.get_recorded(name))
+          name: concat(recorder.get_recorded(name))
           for name in to_compute.keys()
       }
 
   def fix(self, want, got):
-    if 'DeferredFrame' in got:
-      to_compute = {
-          m.group(0): self._env._all_frames[int(m.group(1))]
-          for m in re.finditer(r'DeferredFrame\[(\d+)\]', got)
-      }
-      computed = self.compute(to_compute)
-      for name, frame in computed.items():
-        got = got.replace(name, repr(frame))
-      got = '\n'.join(sorted(line.rstrip() for line in got.split('\n')))
-      want = '\n'.join(sorted(line.rstrip() for line in want.split('\n')))
+    if 'DeferredBase' in got:
+      try:
+        to_compute = {
+            m.group(0): self._env._all_frames[int(m.group(1))]
+            for m in re.finditer(r'DeferredBase\[(\d+)\]', got)
+        }
+        computed = self.compute(to_compute)
+        for name, frame in computed.items():
+          got = got.replace(name, repr(frame))
+
+        def sort_and_normalize(text):
+          return '\n'.join(
+              sorted(
+                  line.rstrip()
+                  for line in text.split('\n') if line.strip())) + '\n'
+
+        got = sort_and_normalize(got)
+        want = sort_and_normalize(want)
+      except Exception:
+        got = traceback.format_exc()
     return want, got
 
   def check_output(self, want, got, optionflags):
@@ -239,7 +257,6 @@
 
   def output_difference(self, example, got, optionflags):
     want, got = self.fix(example.want, got)
-    want = example.want
     if want != example.want:
       example = doctest.Example(
           example.source,
@@ -294,8 +311,13 @@
 
 
 def teststring(text, report=True, **runner_kwargs):
+  optionflags = runner_kwargs.pop('optionflags', 0)
+  optionflags |= (
+      doctest.NORMALIZE_WHITESPACE | doctest.IGNORE_EXCEPTION_DETAIL)
+
   parser = doctest.DocTestParser()
-  runner = BeamDataframeDoctestRunner(TestEnvironment(), **runner_kwargs)
+  runner = BeamDataframeDoctestRunner(
+      TestEnvironment(), optionflags=optionflags, **runner_kwargs)
   test = parser.get_doctest(
       text, {
           'pd': runner.fake_pandas_module(), 'np': np
diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py
index 2873778..7780cd3 100644
--- a/sdks/python/apache_beam/dataframe/frame_base.py
+++ b/sdks/python/apache_beam/dataframe/frame_base.py
@@ -17,16 +17,22 @@
 from __future__ import absolute_import
 
 import inspect
+from typing import Any
+from typing import Callable
 from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Union
 
 import pandas as pd
 
 from apache_beam.dataframe import expressions
+from apache_beam.dataframe import partitionings
 
 
-class DeferredFrame(object):
+class DeferredBase(object):
 
-  _pandas_type_map = {}  # type: Dict[type, type]
+  _pandas_type_map = {}  # type: Dict[Union[type, None], type]
 
   def __init__(self, expr):
     self._expr = expr
@@ -41,16 +47,34 @@
 
   @classmethod
   def wrap(cls, expr):
-    return cls._pandas_type_map[type(expr.proxy())](expr)
+    proxy_type = type(expr.proxy())
+    if proxy_type in cls._pandas_type_map:
+      wrapper_type = cls._pandas_type_map[proxy_type]
+    else:
+      if expr.requires_partition_by() != partitionings.Singleton():
+        raise ValueError(
+            'Scalar expression %s partitoned by non-singleton %s' %
+            (expr, expr.requires_partition_by()))
+      wrapper_type = _DeferredScalar
+    return wrapper_type(expr)
 
   def _elementwise(self, func, name=None, other_args=(), inplace=False):
     return _elementwise_function(func, name, inplace=inplace)(self, *other_args)
 
+
+class DeferredFrame(DeferredBase):
   @property
   def dtypes(self):
     return self._expr.proxy().dtypes
 
 
+class _DeferredScalar(DeferredBase):
+  pass
+
+
+DeferredBase._pandas_type_map[None] = _DeferredScalar
+
+
 def name_and_func(method):
   if isinstance(method, str):
     return method, lambda df, *args, **kwargs: getattr(df, method)(*args, **
@@ -60,21 +84,64 @@
 
 
 def _elementwise_method(func, name=None, restrictions=None, inplace=False):
+  return _proxy_method(
+      func,
+      name,
+      restrictions,
+      inplace,
+      requires_partition_by=partitionings.Nothing(),
+      preserves_partition_by=partitionings.Singleton())
+
+
+def _proxy_method(
+    func,
+    name=None,
+    restrictions=None,
+    inplace=False,
+    requires_partition_by=partitionings.Singleton(),
+    preserves_partition_by=partitionings.Nothing()):
   if name is None:
     name, func = name_and_func(func)
   if restrictions is None:
     restrictions = {}
-  return _elementwise_function(func, name, restrictions)
+  return _proxy_function(
+      func,
+      name,
+      restrictions,
+      inplace,
+      requires_partition_by,
+      preserves_partition_by)
 
 
 def _elementwise_function(func, name=None, restrictions=None, inplace=False):
+  return _proxy_function(
+      func,
+      name,
+      restrictions,
+      inplace,
+      requires_partition_by=partitionings.Nothing(),
+      preserves_partition_by=partitionings.Singleton())
+
+
+def _proxy_function(
+      func,  # type: Union[Callable, str]
+      name=None,  # type: Optional[str]
+      restrictions=None,  # type: Optional[Dict[str, Union[Any, List[Any]]]]
+      inplace=False,  # type: bool
+      requires_partition_by=partitionings.Singleton(),  # type: partitionings.Partitioning
+      preserves_partition_by=partitionings.Nothing(),  # type: partitionings.Partitioning
+):
+
   if name is None:
-    name = func.__name__
+    if isinstance(func, str):
+      name = func
+    else:
+      name = func.__name__
   if restrictions is None:
     restrictions = {}
 
   def wrapper(*args, **kwargs):
-    for key, values in restrictions.items():
+    for key, values in ():  #restrictions.items():
       if key in kwargs:
         value = kwargs[key]
       else:
@@ -96,7 +163,7 @@
     deferred_arg_exprs = []
     constant_args = [None] * len(args)
     for ix, arg in enumerate(args):
-      if isinstance(arg, DeferredFrame):
+      if isinstance(arg, DeferredBase):
         deferred_arg_indices.append(ix)
         deferred_arg_exprs.append(arg._expr)
       elif isinstance(arg, pd.core.generic.NDFrame):
@@ -116,11 +183,14 @@
         full_args[ix] = arg
       return actual_func(*full_args, **kwargs)
 
-    result_expr = expressions.elementwise_expression(
-        name, apply, deferred_arg_exprs)
+    result_expr = expressions.ComputedExpression(
+        name,
+        apply,
+        deferred_arg_exprs,
+        requires_partition_by=requires_partition_by,
+        preserves_partition_by=preserves_partition_by)
     if inplace:
       args[0]._expr = result_expr
-      return args[0]
 
     else:
       return DeferredFrame.wrap(result_expr)
@@ -128,6 +198,25 @@
   return wrapper
 
 
+def _agg_method(func):
+  def wrapper(self, *args, **kwargs):
+    return self.agg(func, *args, **kwargs)
+
+  return wrapper
+
+
+def _associative_agg_method(func):
+  # TODO(robertwb): Multi-level agg.
+  return _agg_method(func)
+
+
+def wont_implement_method(msg):
+  def wrapper(self, *args, **kwargs):
+    raise WontImplementError(msg)
+
+  return wrapper
+
+
 def copy_and_mutate(func):
   def wrapper(self, *args, **kwargs):
     copy = self.copy()
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index be14431..89e9154 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -29,6 +29,58 @@
     raise frame_base.WontImplementError(
         'Conversion to a non-deferred a numpy array.')
 
+  isna = frame_base._elementwise_method('isna')
+  notnull = notna = frame_base._elementwise_method('notna')
+
+  transform = frame_base._elementwise_method(
+      'transform', restrictions={'axis': 0})
+
+  def agg(self, *args, **kwargs):
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'agg',
+            lambda df: df.agg(*args, **kwargs), [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=partitionings.Singleton()))
+
+  all = frame_base._associative_agg_method('all')
+  any = frame_base._associative_agg_method('any')
+  min = frame_base._associative_agg_method('min')
+  max = frame_base._associative_agg_method('max')
+  prod = product = frame_base._associative_agg_method('prod')
+  sum = frame_base._associative_agg_method('sum')
+
+  cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
+      'order-sensitive')
+  diff = frame_base.wont_implement_method('order-sensitive')
+
+  def replace(
+      self,
+      to_replace=None,
+      value=None,
+      inplace=False,
+      limit=None,
+      *args,
+      **kwargs):
+    if limit is None:
+      requires_partition_by = partitionings.Nothing()
+    else:
+      requires_partition_by = partitionings.Singleton()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'replace',
+            lambda df: df.replace(
+                to_replace, value, False, limit, *args, **kwargs), [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  def unstack(self, *args, **kwargs):
+    raise frame_base.WontImplementError('non-deferred column values')
+
 
 for base in ['add', 'sub', 'mul', 'div', 'truediv', 'floordiv', 'mod', 'pow']:
   for p in ['%s', 'r%s', '__%s__', '__r%s__']:
@@ -54,10 +106,6 @@
   def T(self):
     return self.transpose()
 
-  def transpose(self, dtype=None):
-    raise frame_base.WontImplementError(
-        'require non-index partitioning')  # XXXX ignore for now
-
   def groupby(self, cols):
     # TODO: what happens to the existing index?
     # We set the columns to index as we have a notion of being partitioned by
@@ -111,19 +159,195 @@
   def loc(self):
     return _DeferredLoc(self)
 
-  def aggregate(self, func, axis=0, *args, **kwargs):
-    if axis != 0:
-      raise NotImplementedError()
+  def aggregate(self, *args, **kwargs):
+    if 'axis' in kwargs and kwargs['axis'] is None:
+      return self.agg(*args, **dict(kwargs, axis=1)).agg(
+          *args, **dict(kwargs, axis=0))
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'aggregate',
-            lambda df: df.agg(func, axis, *args, **kwargs),
+            lambda df: df.agg(*args, **kwargs),
             [self._expr],
             # TODO(robertwb): Sub-aggregate when possible.
             requires_partition_by=partitionings.Singleton()))
 
   agg = aggregate
 
+  applymap = frame_base._elementwise_method('applymap')
+
+  memory_usage = frame_base.wont_implement_method('non-deferred value')
+
+  all = frame_base._associative_agg_method('all')
+  any = frame_base._associative_agg_method('any')
+
+  cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
+      'order-sensitive')
+  diff = frame_base.wont_implement_method('order-sensitive')
+
+  max = frame_base._associative_agg_method('max')
+  min = frame_base._associative_agg_method('min')
+  mode = frame_base._agg_method('mode')
+
+  def dropna(
+      self,
+      axis=0,
+      how='any',
+      thresh=None,
+      subset=None,
+      inplace=False,
+      *args,
+      **kwargs):
+    # TODO(robertwb): This is a common pattern. Generalize?
+    if axis == 1 or axis == 'columns':
+      requires_partition_by = partitionings.Singleton()
+    else:
+      requires_partition_by = partitionings.Nothing()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'dropna',
+            lambda df: df.dropna(
+                axis, how, thresh, subset, False, *args, **kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
+      'non-lazy')
+
+  isna = frame_base._elementwise_method('isna')
+  notnull = notna = frame_base._elementwise_method('notna')
+
+  prod = product = frame_base._associative_agg_method('prod')
+
+  def quantile(self, q=0.5, axis=0, *args, **kwargs):
+    if axis != 0:
+      raise frame_base.WontImplementError('non-deferred column values')
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'quantile',
+            lambda df: df.quantile(q, axis, *args, **kwargs),
+            [self._expr],
+            #TODO(robertwb): Approximate quantiles?
+            requires_partition_by=partitionings.Singleton(),
+            preserves_partition_by=partitionings.Singleton()))
+
+  query = frame_base._elementwise_method('query')
+
+  def replace(self, to_replace=None,
+      value=None,
+      inplace=False,
+      limit=None, *args, **kwargs):
+    if limit is None:
+      requires_partition_by = partitionings.Nothing()
+    else:
+      requires_partition_by = partitionings.Singleton()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'replace',
+            lambda df: df.replace(
+                to_replace, value, False, limit, *args, **kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  def reset_index(self, level=None, drop=False, inplace=False, *args, **kwargs):
+    if level is not None and not isinstance(level, (tuple, list)):
+      level = [level]
+    if level is None or len(level) == len(self._expr.proxy().index.levels):
+      # TODO: Could do distributed re-index with offsets.
+      requires_partition_by = partitionings.Singleton()
+    else:
+      requires_partition_by = partitionings.Nothing()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'reset_index',
+            lambda df: df.reset_index(level, drop, False, *args, **kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  round = frame_base._elementwise_method('round')
+  select_dtypes = frame_base._elementwise_method('select_dtypes')
+
+  def shift(self, periods=1, freq=None, axis=0, *args, **kwargs):
+    if axis == 1 or axis == 'columns':
+      requires_partition_by = partitionings.Nothing()
+    else:
+      requires_partition_by = partitionings.Singleton()
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'shift',
+            lambda df: df.shift(periods, freq, axis, *args, **kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+
+  @property
+  def shape(self):
+    raise frame_base.WontImplementError('scalar value')
+
+  def sort_values(
+      self, by, axis=0, ascending=True, inplace=False, *args, **kwargs):
+    if axis == 1 or axis == 'columns':
+      requires_partition_by = partitionings.Nothing()
+    else:
+      requires_partition_by = partitionings.Singleton()
+    result = frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'sort_values',
+            lambda df: df.sort_values(
+                by, axis, ascending, False, *args, **kwargs),
+            [self._expr],
+            preserves_partition_by=partitionings.Singleton(),
+            requires_partition_by=requires_partition_by))
+    if inplace:
+      self._expr = result._expr
+    else:
+      return result
+
+  stack = frame_base._elementwise_method('stack')
+
+  sum = frame_base._associative_agg_method('sum')
+
+  to_records = to_dict = to_numpy = to_string = (
+      frame_base.wont_implement_method('non-deferred value'))
+
+  to_sparse = to_string # frame_base._elementwise_method('to_sparse')
+
+  transform = frame_base._elementwise_method(
+      'transform', restrictions={'axis': 0})
+
+  def transpose(self, *args, **kwargs):
+    raise frame_base.WontImplementError('non-deferred column values')
+
+  def unstack(self, *args, **kwargs):
+    if self._expr.proxy().index.nlevels == 1:
+      return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'unstack',
+            lambda df: df.unstack(*args, **kwargs),
+            [self._expr],
+            requires_partition_by=partitionings.Index()))
+    else:
+      raise frame_base.WontImplementError('non-deferred column values')
+
+  update = frame_base._proxy_method(
+      'update',
+      inplace=True,
+      requires_partition_by=partitionings.Index(),
+      preserves_partition_by=partitionings.Index())
 
 for meth in ('filter', ):
   setattr(DeferredDataFrame, meth, frame_base._elementwise_method(meth))
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index e0c75b5..eaf46d6 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -25,6 +25,7 @@
 
 
 @unittest.skipIf(sys.version_info <= (3, ), 'Requires contextlib.ExitStack.')
+@unittest.skipIf(sys.version_info < (3, 6), 'Nondeterministic dict ordering.')
 class DoctestTest(unittest.TestCase):
   def test_dataframe_tests(self):
     result = doctests.testmod(
@@ -34,11 +35,9 @@
             'pandas.core.frame.DataFrame.T': ['*'],
             'pandas.core.frame.DataFrame.agg': ['*'],
             'pandas.core.frame.DataFrame.aggregate': ['*'],
-            'pandas.core.frame.DataFrame.all': ['*'],
-            'pandas.core.frame.DataFrame.any': ['*'],
             'pandas.core.frame.DataFrame.append': ['*'],
             'pandas.core.frame.DataFrame.apply': ['*'],
-            'pandas.core.frame.DataFrame.applymap': ['*'],
+            'pandas.core.frame.DataFrame.applymap': ['df ** 2'],
             'pandas.core.frame.DataFrame.assign': ['*'],
             'pandas.core.frame.DataFrame.axes': ['*'],
             'pandas.core.frame.DataFrame.combine': ['*'],
@@ -46,65 +45,44 @@
             'pandas.core.frame.DataFrame.corr': ['*'],
             'pandas.core.frame.DataFrame.count': ['*'],
             'pandas.core.frame.DataFrame.cov': ['*'],
-            'pandas.core.frame.DataFrame.cummax': ['*'],
-            'pandas.core.frame.DataFrame.cummin': ['*'],
-            'pandas.core.frame.DataFrame.cumprod': ['*'],
-            'pandas.core.frame.DataFrame.cumsum': ['*'],
-            'pandas.core.frame.DataFrame.diff': ['*'],
             'pandas.core.frame.DataFrame.dot': ['*'],
             'pandas.core.frame.DataFrame.drop': ['*'],
-            'pandas.core.frame.DataFrame.dropna': ['*'],
             'pandas.core.frame.DataFrame.eval': ['*'],
             'pandas.core.frame.DataFrame.explode': ['*'],
             'pandas.core.frame.DataFrame.fillna': ['*'],
             'pandas.core.frame.DataFrame.info': ['*'],
             'pandas.core.frame.DataFrame.isin': ['*'],
-            'pandas.core.frame.DataFrame.isna': ['*'],
-            'pandas.core.frame.DataFrame.isnull': ['*'],
-            'pandas.core.frame.DataFrame.items': ['*'],
-            'pandas.core.frame.DataFrame.iteritems': ['*'],
-            'pandas.core.frame.DataFrame.iterrows': ['*'],
-            'pandas.core.frame.DataFrame.itertuples': ['*'],
+            'pandas.core.frame.DataFrame.iterrows': ["print(df['int'].dtype)"],
             'pandas.core.frame.DataFrame.join': ['*'],
-            'pandas.core.frame.DataFrame.max': ['*'],
             'pandas.core.frame.DataFrame.melt': ['*'],
             'pandas.core.frame.DataFrame.memory_usage': ['*'],
             'pandas.core.frame.DataFrame.merge': ['*'],
-            'pandas.core.frame.DataFrame.min': ['*'],
-            'pandas.core.frame.DataFrame.mode': ['*'],
+            # Not equal to df.agg('mode', axis='columns', numeric_only=True)
+            'pandas.core.frame.DataFrame.mode': [
+                "df.mode(axis='columns', numeric_only=True)"
+            ],
             'pandas.core.frame.DataFrame.nlargest': ['*'],
-            'pandas.core.frame.DataFrame.notna': ['*'],
-            'pandas.core.frame.DataFrame.notnull': ['*'],
             'pandas.core.frame.DataFrame.nsmallest': ['*'],
             'pandas.core.frame.DataFrame.nunique': ['*'],
             'pandas.core.frame.DataFrame.pivot': ['*'],
             'pandas.core.frame.DataFrame.pivot_table': ['*'],
-            'pandas.core.frame.DataFrame.prod': ['*'],
-            'pandas.core.frame.DataFrame.product': ['*'],
-            'pandas.core.frame.DataFrame.quantile': ['*'],
             'pandas.core.frame.DataFrame.query': ['*'],
             'pandas.core.frame.DataFrame.reindex': ['*'],
             'pandas.core.frame.DataFrame.reindex_axis': ['*'],
             'pandas.core.frame.DataFrame.rename': ['*'],
-            'pandas.core.frame.DataFrame.replace': ['*'],
-            'pandas.core.frame.DataFrame.reset_index': ['*'],
+            # Raises right exception, but testing framework has matching issues.
+            'pandas.core.frame.DataFrame.replace': [
+                "df.replace({'a string': 'new value', True: False})  # raises"
+            ],
+            # Uses unseeded np.random.
             'pandas.core.frame.DataFrame.round': ['*'],
-            'pandas.core.frame.DataFrame.select_dtypes': ['*'],
             'pandas.core.frame.DataFrame.set_index': ['*'],
-            'pandas.core.frame.DataFrame.shape': ['*'],
-            'pandas.core.frame.DataFrame.shift': ['*'],
-            'pandas.core.frame.DataFrame.sort_values': ['*'],
-            'pandas.core.frame.DataFrame.stack': ['*'],
-            'pandas.core.frame.DataFrame.sum': ['*'],
-            'pandas.core.frame.DataFrame.to_dict': ['*'],
-            'pandas.core.frame.DataFrame.to_numpy': ['*'],
+            'pandas.core.frame.DataFrame.transpose': [
+                'df1_transposed.dtypes', 'df2_transposed.dtypes'
+            ],
+            'pandas.core.frame.DataFrame.to_sparse': ['type(df)'],
+            # Uses df.index
             'pandas.core.frame.DataFrame.to_records': ['*'],
-            'pandas.core.frame.DataFrame.to_sparse': ['*'],
-            'pandas.core.frame.DataFrame.to_string': ['*'],
-            'pandas.core.frame.DataFrame.transform': ['*'],
-            'pandas.core.frame.DataFrame.transpose': ['*'],
-            'pandas.core.frame.DataFrame.unstack': ['*'],
-            'pandas.core.frame.DataFrame.update': ['*'],
         })
     self.assertEqual(result.failed, 0)
 
@@ -113,10 +91,6 @@
         pd.core.series,
         use_beam=False,
         skip={
-            'pandas.core.series.Series.agg': ['*'],
-            'pandas.core.series.Series.aggregate': ['*'],
-            'pandas.core.series.Series.all': ['*'],
-            'pandas.core.series.Series.any': ['*'],
             'pandas.core.series.Series.append': ['*'],
             'pandas.core.series.Series.argmax': ['*'],
             'pandas.core.series.Series.argmin': ['*'],
@@ -127,11 +101,6 @@
             'pandas.core.series.Series.corr': ['*'],
             'pandas.core.series.Series.count': ['*'],
             'pandas.core.series.Series.cov': ['*'],
-            'pandas.core.series.Series.cummax': ['*'],
-            'pandas.core.series.Series.cummin': ['*'],
-            'pandas.core.series.Series.cumprod': ['*'],
-            'pandas.core.series.Series.cumsum': ['*'],
-            'pandas.core.series.Series.diff': ['*'],
             'pandas.core.series.Series.dot': ['*'],
             'pandas.core.series.Series.drop': ['*'],
             'pandas.core.series.Series.drop_duplicates': ['*'],
@@ -142,20 +111,12 @@
             'pandas.core.series.Series.idxmax': ['*'],
             'pandas.core.series.Series.idxmin': ['*'],
             'pandas.core.series.Series.isin': ['*'],
-            'pandas.core.series.Series.isna': ['*'],
-            'pandas.core.series.Series.isnull': ['*'],
             'pandas.core.series.Series.items': ['*'],
             'pandas.core.series.Series.iteritems': ['*'],
-            'pandas.core.series.Series.max': ['*'],
             'pandas.core.series.Series.memory_usage': ['*'],
-            'pandas.core.series.Series.min': ['*'],
             'pandas.core.series.Series.nlargest': ['*'],
             'pandas.core.series.Series.nonzero': ['*'],
-            'pandas.core.series.Series.notna': ['*'],
-            'pandas.core.series.Series.notnull': ['*'],
             'pandas.core.series.Series.nsmallest': ['*'],
-            'pandas.core.series.Series.prod': ['*'],
-            'pandas.core.series.Series.product': ['*'],
             'pandas.core.series.Series.quantile': ['*'],
             'pandas.core.series.Series.reindex': ['*'],
             'pandas.core.series.Series.rename': ['*'],
@@ -167,14 +128,11 @@
             'pandas.core.series.Series.shift': ['*'],
             'pandas.core.series.Series.sort_index': ['*'],
             'pandas.core.series.Series.sort_values': ['*'],
-            'pandas.core.series.Series.sum': ['*'],
             'pandas.core.series.Series.take': ['*'],
             'pandas.core.series.Series.to_csv': ['*'],
             'pandas.core.series.Series.to_dict': ['*'],
             'pandas.core.series.Series.to_frame': ['*'],
-            'pandas.core.series.Series.transform': ['*'],
             'pandas.core.series.Series.unique': ['*'],
-            'pandas.core.series.Series.unstack': ['*'],
             'pandas.core.series.Series.update': ['*'],
             'pandas.core.series.Series.values': ['*'],
             'pandas.core.series.Series.view': ['*'],
diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py
index 32772d5..a270458 100644
--- a/sdks/python/apache_beam/dataframe/transforms.py
+++ b/sdks/python/apache_beam/dataframe/transforms.py
@@ -125,32 +125,47 @@
         return '%s:%s' % (self.stage.ops, id(self))
 
       def expand(self, pcolls):
-        if self.stage.partitioning != partitionings.Nothing():
+        scalar_inputs = [expr for expr in self.stage.inputs if is_scalar(expr)]
+        tabular_inputs = [
+            expr for expr in self.stage.inputs if not is_scalar(expr)
+        ]
+
+        if len(tabular_inputs) == 0:
+          partitioned_pcoll = next(pcolls.values()).pipeline | beam.Create([{}])
+
+        elif self.stage.partitioning != partitionings.Nothing():
           # Arrange such that partitioned_pcoll is properly partitioned.
-          input_pcolls = {
-              tag: pcoll | 'Flat%s' % tag >> beam.FlatMap(
+          main_pcolls = {
+              expr._id: pcolls[expr._id] | 'Flat%s' % expr._id >> beam.FlatMap(
                   self.stage.partitioning.partition_fn)
-              for (tag, pcoll) in pcolls.items()
-          }
-          partitioned_pcoll = input_pcolls | beam.CoGroupByKey(
-          ) | beam.MapTuple(
+              for expr in tabular_inputs
+          } | beam.CoGroupByKey()
+          partitioned_pcoll = main_pcolls | beam.MapTuple(
               lambda _,
               inputs: {tag: pd.concat(vs)
                        for tag, vs in inputs.items()})
+
         else:
           # Already partitioned, or no partitioning needed.
-          (tag, pcoll), = pcolls.items()
-          partitioned_pcoll = pcoll | beam.Map(lambda df: {tag: df})
+          assert len(tabular_inputs) == 1
+          tag = tabular_inputs[0]._id
+          partitioned_pcoll = pcolls[tag] | beam.Map(lambda df: {tag: df})
+
+        side_pcolls = {
+            expr._id: beam.pvalue.AsSingleton(pcolls[expr._id])
+            for expr in scalar_inputs
+        }
 
         # Actually evaluate the expressions.
-        def evaluate(partition, stage=self.stage):
+        def evaluate(partition, stage=self.stage, **side_inputs):
           session = expressions.Session(
-              {expr: partition[expr._id]
-               for expr in stage.inputs})
+              dict([(expr, partition[expr._id]) for expr in tabular_inputs] +
+                   [(expr, side_inputs[expr._id]) for expr in scalar_inputs]))
           for expr in stage.outputs:
             yield beam.pvalue.TaggedOutput(expr._id, expr.evaluate_at(session))
 
-        return partitioned_pcoll | beam.FlatMap(evaluate).with_outputs()
+        return partitioned_pcoll | beam.FlatMap(evaluate, **
+                                                side_pcolls).with_outputs()
 
     class Stage(object):
       """Used to build up a set of operations that can be fused together.
@@ -199,6 +214,10 @@
             yield stage
 
     @memoize
+    def is_scalar(expr):
+      return not isinstance(expr.proxy(), pd.core.generic.NDFrame)
+
+    @memoize
     def expr_to_stages(expr):
       assert expr not in inputs
       # First attempt to compute this expression as part of an existing stage,
@@ -212,7 +231,7 @@
       for stage in common_stages([expr_to_stages(arg) for arg in expr.args()
                                   if arg not in inputs]):
         if all(output_is_partitioned_by(arg, stage, required_partitioning)
-               for arg in expr.args()):
+               for arg in expr.args() if not is_scalar(arg)):
           break
       else:
         # Otherwise, compute this expression as part of a new stage.
diff --git a/sdks/python/apache_beam/dataframe/transforms_test.py b/sdks/python/apache_beam/dataframe/transforms_test.py
index 1684b31..81917cf 100644
--- a/sdks/python/apache_beam/dataframe/transforms_test.py
+++ b/sdks/python/apache_beam/dataframe/transforms_test.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 from __future__ import absolute_import
+from __future__ import division
 
 import unittest
 
@@ -37,15 +38,28 @@
     actual_deferred = func(input_deferred)._expr.evaluate_at(
         expressions.Session({input_placeholder: input}))
 
+    def concat(parts):
+      if len(parts) > 1:
+        return pd.concat(parts)
+      elif len(parts) == 1:
+        return parts[0]
+      else:
+        return None
+
     def check_correct(actual):
       if actual is None:
         raise AssertionError('Empty frame but expected: \n\n%s' % (expected))
-      sorted_actual = actual.sort_index()
-      sorted_expected = expected.sort_index()
-      if not sorted_actual.equals(sorted_expected):
-        raise AssertionError(
-            'Dataframes not equal: \n\n%s\n\n%s' %
-            (sorted_actual, sorted_expected))
+      if isinstance(expected, pd.core.generic.NDFrame):
+        sorted_actual = actual.sort_index()
+        sorted_expected = expected.sort_index()
+        if not sorted_actual.equals(sorted_expected):
+          raise AssertionError(
+              'Dataframes not equal: \n\n%s\n\n%s' %
+              (sorted_actual, sorted_expected))
+      else:
+        if actual != expected:
+          raise AssertionError(
+              'Scalars not equal: %s != %s' % (actual, expected))
 
     check_correct(actual_deferred)
 
@@ -53,9 +67,7 @@
       input_pcoll = p | beam.Create([input[::2], input[1::2]])
       output_pcoll = input_pcoll | transforms.DataframeTransform(
           func, proxy=empty)
-      assert_that(
-          output_pcoll,
-          lambda actual: check_correct(pd.concat(actual) if actual else None))
+      assert_that(output_pcoll, lambda actual: check_correct(concat(actual)))
 
   def test_identity(self):
     df = pd.DataFrame({
@@ -87,6 +99,16 @@
     self.run_scenario(a, lambda a: a.agg(sum))
     self.run_scenario(a, lambda a: a.agg(['mean', 'min', 'max']))
 
+  def test_scalar(self):
+    a = pd.Series([1, 2, 6])
+    self.run_scenario(a, lambda a: a.agg(sum))
+    self.run_scenario(a, lambda a: a / a.agg(sum))
+
+    # Tests scalar being used as an input to a downstream stage.
+    df = pd.DataFrame({'key': ['a', 'a', 'b'], 'val': [1, 2, 6]})
+    self.run_scenario(
+        df, lambda df: df.groupby('key').sum().val / df.val.agg(sum))
+
   def test_input_output_polymorphism(self):
     one_series = pd.Series([1])
     two_series = pd.Series([2])
diff --git a/website/www/site/config.toml b/website/www/site/config.toml
index b156f93..e979921 100644
--- a/website/www/site/config.toml
+++ b/website/www/site/config.toml
@@ -104,7 +104,7 @@
 
 [params]
 description = "Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."
-release_latest = "2.22.0"
+release_latest = "2.23.0"
 # The repository and branch where the files live in Github or Colab. This is used
 # to serve and stage from your local branch, but publish to the master branch.
 #   e.g. https://github.com/{{< param branch_repo >}}/path/to/notebook.ipynb
diff --git a/website/www/site/content/en/blog/beam-2.23.0.md b/website/www/site/content/en/blog/beam-2.23.0.md
new file mode 100644
index 0000000..c7400fa
--- /dev/null
+++ b/website/www/site/content/en/blog/beam-2.23.0.md
@@ -0,0 +1,68 @@
+---
+title:  "Apache Beam 2.23.0"
+date:   2020-07-29 00:00:01 -0800
+categories:
+  - blog
+authors:
+  - Valentyn Tymofieiev
+---
+<!--
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+We are happy to present the new 2.23.0 release of Apache Beam. This release includes both improvements and new functionality.
+See the [download page](/get-started/downloads/#2230-2020-07-29) for this release.
+For more information on changes in 2.23.0, check out the
+[detailed release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12347145).
+
+## Highlights
+
+* Twister2 Runner ([BEAM-7304](https://issues.apache.org/jira/browse/BEAM-7304)).
+* Python 3.8 support ([BEAM-8494](https://issues.apache.org/jira/browse/BEAM-8494)).
+
+## I/Os
+
+* Support for reading from Snowflake added (Java) ([BEAM-9722](https://issues.apache.org/jira/browse/BEAM-9722)).
+* Support for writing to Splunk added (Java) ([BEAM-8596](https://issues.apache.org/jira/browse/BEAM-8596)).
+* Support for assume role added (Java) ([BEAM-10335](https://issues.apache.org/jira/browse/BEAM-10335)).
+* A new transform to read from BigQuery has been added: `apache_beam.io.gcp.bigquery.ReadFromBigQuery`. This transform
+  is experimental. It reads data from BigQuery by exporting data to Avro files, and reading those files. It also supports
+  reading data by exporting to JSON files. This has small differences in behavior for Time and Date-related fields. See
+  Pydoc for more information.
+* Add dispositions for SnowflakeIO.write ([BEAM-10343](https://issues.apache.org/jira/browse/BEAM-10343))
+
+## New Features / Improvements
+
+* Update Snowflake JDBC dependency and add application=beam to connection URL ([BEAM-10383](https://issues.apache.org/jira/browse/BEAM-10383)).
+
+## Breaking Changes
+
+* `RowJson.RowJsonDeserializer`, `JsonToRow`, and `PubsubJsonTableProvider` now accept "implicit
+  nulls" by default when deserializing JSON (Java) ([BEAM-10220](https://issues.apache.org/jira/browse/BEAM-10220)).
+  Previously nulls could only be represented with explicit null values, as in
+  `{"foo": "bar", "baz": null}`, whereas an implicit null like `{"foo": "bar"}` would raise an
+  exception. Now both JSON strings will yield the same result by default. This behavior can be
+  overridden with `RowJson.RowJsonDeserializer#withNullBehavior`.
+* Fixed a bug in `GroupIntoBatches` experimental transform in Python to actually group batches by key. 
+  This changes the output type for this transform ([BEAM-6696](https://issues.apache.org/jira/browse/BEAM-6696)).
+
+## Deprecations
+
+* Remove Gearpump runner. ([BEAM-9999](https://issues.apache.org/jira/browse/BEAM-9999))
+* Remove Apex runner. ([BEAM-9999](https://issues.apache.org/jira/browse/BEAM-9999))
+* RedisIO.readAll() is deprecated and will be removed in 2 versions, users must use RedisIO.readKeyPatterns() as a replacement ([BEAM-9747](https://issues.apache.org/jira/browse/BEAM-9747)).
+
+## Known Issues
+
+## List of Contributors
+
+According to git shortlog, the following people contributed to the 2.23.0 release. Thank you to all contributors!
+
+Aaron, Abhishek Yadav, Ahmet Altay, aiyangar, Aizhamal Nurmamat kyzy, Ajo Thomas, Akshay-Iyangar, Alan Pryor, Alex Amato, Alexey Romanenko, Allen Pradeep Xavier, Andrew Crites, Andrew Pilloud, Ankur Goenka, Anna Qin, Ashwin Ramaswami, bntnam, Borzoo Esmailloo, Boyuan Zhang, Brian Hulette, Brian Michalski, brucearctor, Chamikara Jayalath, chi-chi weng, Chuck Yang, Chun Yang, Colm O hEigeartaigh, Corvin Deboeser, Craig Chambers, Damian Gadomski, Damon Douglas, Daniel Oliveira, Dariusz Aniszewski, darshanj, darshan jani, David Cavazos, David Moravek, David Yan, Esun Kim, Etienne Chauchot, Filipe Regadas, fuyuwei, Graeme Morgan, Hannah-Jiang, Harch Vardhan, Heejong Lee, Henry Suryawirawan, InigoSJ, Ismaël Mejía, Israel Herraiz, Jacob Ferriero, Jan Lukavský, Jie Fan, John Mora, Jozef Vilcek, Julien Phalip, Justine Koa, Kamil Gabryjelski, Kamil Wasilewski, Kasia Kucharczyk, Kenneth Jung, Kenneth Knowles, kevingg, Kevin Sijo Puthusseri, kshivvy, Kyle Weaver, Kyoungha Min, Kyungwon Jo, Luke Cwik, Mark Liu, Mark-Zeng, Matthias Baetens, Maximilian Michels, Michal Walenia, Mikhail Gryzykhin, Nam Bui, Nathan Fisher, Niel Markwick, Ning Kang, Omar Ismail, Pablo Estrada, paul fisher, Pawel Pasterz, perkss, Piotr Szuberski, pulasthi, purbanow, Rahul Patwari, Rajat Mittal, Rehman, Rehman Murad Ali, Reuben van Ammers, Reuven Lax, Reza Rokni, Rion Williams, Robert Bradshaw, Robert Burke, Rui Wang, Ruoyun Huang, sabhyankar, Sam Rohde, Sam Whittle, sclukas77, Sebastian Graca, Shoaib Zafar, Sruthi Sree Kumar, Stephen O'Kennedy, Steve Koonce, Steve Niemitz, Steven van Rossum, Ted Romer, Tesio, Thinh Ha, Thomas Weise, Tobias Kaymak, tobiaslieber-cognitedata, Tobiasz KÄ™dzierski, Tomo Suzuki, Tudor Marian, tvs, Tyson Hamilton, Udi Meiri, Valentyn Tymofieiev, Vasu Nori, xuelianhan, Yichi Zhang, Yifan Zou, Yixing Zhang, yoshiki.obata, Yueyang Qiu, Yu Feng, Yuwei Fu, Zhuo Peng, ZijieSong946.
diff --git a/website/www/site/content/en/documentation/transforms/python/aggregation/latest.md b/website/www/site/content/en/documentation/transforms/python/aggregation/latest.md
index 44fbfed..f9692b1 100644
--- a/website/www/site/content/en/documentation/transforms/python/aggregation/latest.md
+++ b/website/www/site/content/en/documentation/transforms/python/aggregation/latest.md
@@ -14,10 +14,61 @@
 See the License for the specific language governing permissions and
 limitations under the License.
 -->
+
 # Latest
 
-## Examples
-See [BEAM-7390](https://issues.apache.org/jira/browse/BEAM-7390) for updates. 
+{{< localstorage language language-py >}}
 
-## Related transforms 
-* [Sample](/documentation/transforms/python/aggregation/sample) to combine elements. takes samples of the elements in a collection.
\ No newline at end of file
+{{< button-pydoc path="apache_beam.transforms.combiners" class="Latest" >}}
+
+Gets the element with the latest timestamp.
+
+## Examples
+
+In the following examples, we create a pipeline with a `PCollection` of produce with a timestamp for their harvest date.
+
+We use `Latest` to get the element with the latest timestamp from the `PCollection`.
+
+### Example 1: Latest element globally
+
+We use `Latest.Globally()` to get the element with the latest timestamp in the entire `PCollection`.
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/latest.py" latest_globally >}}
+{{< /highlight >}}
+
+{{< paragraph class="notebook-skip" >}}
+Output:
+{{< /paragraph >}}
+
+{{< highlight class="notebook-skip" >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/latest_test.py" latest_element >}}
+{{< /highlight >}}
+
+{{< buttons-code-snippet
+  py="sdks/python/apache_beam/examples/snippets/transforms/aggregation/latest.py" >}}
+
+### Example 2: Latest elements for each key
+
+We use `Latest.PerKey()` to get the elements with the latest timestamp for each key in a `PCollection` of key-values.
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/latest.py" latest_per_key >}}
+{{< /highlight >}}
+
+{{< paragraph class="notebook-skip" >}}
+Output:
+{{< /paragraph >}}
+
+{{< highlight class="notebook-skip" >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/aggregation/latest_test.py" latest_elements_per_key >}}
+{{< /highlight >}}
+
+{{< buttons-code-snippet
+  py="sdks/python/apache_beam/examples/snippets/transforms/aggregation/latest.py" >}}
+
+## Related transforms
+
+* [Sample](/documentation/transforms/python/aggregation/sample) randomly takes some number of elements in a collection.
+
+{{< button-pydoc path="apache_beam.transforms.combiners" class="Latest" >}}
diff --git a/website/www/site/content/en/documentation/transforms/python/overview.md b/website/www/site/content/en/documentation/transforms/python/overview.md
index 71ed708..4cf68e9 100644
--- a/website/www/site/content/en/documentation/transforms/python/overview.md
+++ b/website/www/site/content/en/documentation/transforms/python/overview.md
@@ -55,7 +55,7 @@
   <tr><td><a href="/documentation/transforms/python/aggregation/distinct">Distinct</a></td><td>Produces a collection containing distinct elements from the input collection.</td></tr>  
   <tr><td><a href="/documentation/transforms/python/aggregation/groupbykey">GroupByKey</a></td><td>Takes a keyed collection of elements and produces a collection where each element consists of a key and all values associated with that key.</td></tr>
   <tr><td><a href="/documentation/transforms/python/aggregation/groupintobatches">GroupIntoBatches</a></td><td>Batches the input into desired batch size.</td></tr>
-  <tr><td>Latest</td><td>Not available. See <a href="https://issues.apache.org/jira/browse/BEAM-6695">BEAM-6695</a> for updates.</td></tr>
+  <tr><td><a href="/documentation/transforms/python/aggregation/latest">Latest</a></td><td>Gets the element with the latest timestamp.</td></tr>
   <tr><td>Max</td><td>Not available.</td></tr>  
   <tr><td><a href="/documentation/transforms/python/aggregation/mean">Mean</a></td><td>Computes the average within each aggregation.</td></tr>
   <tr><td>Min</td><td>Not available.</td></tr>
diff --git a/website/www/site/content/en/get-started/downloads.md b/website/www/site/content/en/get-started/downloads.md
index 5ddc110..5ea24cc 100644
--- a/website/www/site/content/en/get-started/downloads.md
+++ b/website/www/site/content/en/get-started/downloads.md
@@ -87,10 +87,18 @@
 
 ## Releases
 
+### 2.23.0 (2020-07-29)
+Official [source code download](http://www.apache.org/dyn/closer.cgi/beam/2.23.0/apache-beam-2.23.0-source-release.zip).
+[SHA-512](https://downloads.apache.org/beam/2.23.0/apache-beam-2.23.0-source-release.zip.sha512).
+[signature](https://downloads.apache.org/beam/2.23.0/apache-beam-2.23.0-source-release.zip.asc).
+
+[Release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12347145).
+[Blog post](/blog/beam-2.23.0).
+
 ### 2.22.0 (2020-06-08)
-Official [source code download](http://www.apache.org/dyn/closer.cgi/beam/2.22.0/apache-beam-2.22.0-source-release.zip).
-[SHA-512](https://downloads.apache.org/beam/2.22.0/apache-beam-2.22.0-source-release.zip.sha512).
-[signature](https://downloads.apache.org/beam/2.22.0/apache-beam-2.22.0-source-release.zip.asc).
+Official [source code download](http://archive.apache.org/dist/beam/2.22.0/apache-beam-2.22.0-source-release.zip).
+[SHA-512](https://archive.apache.org/dist/beam/2.22.0/apache-beam-2.22.0-source-release.zip.sha512).
+[signature](https://archive.apache.org/dist/beam/2.22.0/apache-beam-2.22.0-source-release.zip.asc).
 
 [Release notes](https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12319527&version=12347144).
 [Blog post](/blog/beam-2.22.0).
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index db3b522..600e197 100644
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -199,6 +199,7 @@
           <li><a href="/documentation/transforms/python/aggregation/distinct/">Distinct</a></li>
           <li><a href="/documentation/transforms/python/aggregation/groupbykey/">GroupByKey</a></li>
           <li><a href="/documentation/transforms/python/aggregation/groupintobatches/">GroupIntoBatches</a></li>
+          <li><a href="/documentation/transforms/python/aggregation/latest/">Latest</a></li>
           <li><a href="/documentation/transforms/python/aggregation/mean/">Mean</a></li>
           <li><a href="/documentation/transforms/python/aggregation/sample/">Sample</a></li>
           <li><a href="/documentation/transforms/python/aggregation/top/">Top</a></li>
diff --git a/website/www/site/static/.htaccess b/website/www/site/static/.htaccess
index 1330b01..c5469ee 100644
--- a/website/www/site/static/.htaccess
+++ b/website/www/site/static/.htaccess
@@ -21,4 +21,4 @@
 # The following redirect maintains the previously supported URLs.
 RedirectMatch permanent "/documentation/sdks/(javadoc|pydoc)(.*)" "https://beam.apache.org/releases/$1$2"
 # Keep this updated to point to the current release.
-RedirectMatch "/releases/([^/]+)/current(.*)" "https://beam.apache.org/releases/$1/2.22.0$2"
+RedirectMatch "/releases/([^/]+)/current(.*)" "https://beam.apache.org/releases/$1/2.23.0$2"