Merge pull request #6546: [BEAM-4495] Create precommit to stage website on GCS

diff --git a/.test-infra/metrics/README.md b/.test-infra/metrics/README.md
index d442e43..dca8d17 100644
--- a/.test-infra/metrics/README.md
+++ b/.test-infra/metrics/README.md
@@ -54,12 +54,26 @@
 sudo apt-get update
 sudo apt-get install docker-ce
 
+###################################################
+# Commands below here required to spin up docker  #
+# containers locally. Can be omitted for kuberctl #
+# setup.                                          #
+###################################################
+
 # Install docker-compose
 sudo curl -L https://github.com/docker/compose/releases/download/1.22.0/docker-compose-$(uname -s)-$(uname -m) -o /usr/local/bin/docker-compose
 sudo chmod +x /usr/local/bin/docker-compose
 
+
 # start docker service if it is not running already
 sudo service docker start
+
+# Build images required for spinning up docker-compose
+# related containers.
+docker-compose build
+
+# Spinup docker-compose related containers.
+docker-compose up
 ```
 
 ## Kubernetes setup
diff --git a/.test-infra/metrics/docker-compose.yml b/.test-infra/metrics/docker-compose.yml
index 10b595e..39a018f 100644
--- a/.test-infra/metrics/docker-compose.yml
+++ b/.test-infra/metrics/docker-compose.yml
@@ -43,6 +43,18 @@
       - GF_AUTH_ANONYMOUS_ENABLED=true
       - GF_AUTH_ANONYMOUS_ORG_NAME=Beam
       - GF_INSTALL_PLUGINS=vonage-status-panel
+  syncjenkins:
+    image: syncjenkins
+    container_name: beamsyncjenkins
+    build:
+      context: ./sync/jenkins
+      dockerfile: Dockerfile
+    environment:
+      - JENSYNC_HOST=beampostgresql
+      - JENSYNC_PORT=5432
+      - JENSYNC_DBNAME=beam_metrics
+      - JENSYNC_DBUSERNAME=admin
+      - JENSYNC_DBPWD=<PGPasswordHere>
 volumes:
   beam-postgresql-data:
   beam-grafana-libdata:
diff --git a/LICENSE b/LICENSE
index a2e93e2..44c35bb 100644
--- a/LICENSE
+++ b/LICENSE
@@ -402,3 +402,262 @@
     LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
     OF CONTRACT, TORT OR OTHERWISE,  ARISING FROM, OUT OF OR IN CONNECTION
     WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE
+
+################################################################################
+CPython LICENSE. Source:
+https://github.com/python/cpython/blob/81574b80e92554adf75c13fa42415beb8be383cb/LICENSE
+
+A. HISTORY OF THE SOFTWARE
+==========================
+
+Python was created in the early 1990s by Guido van Rossum at Stichting
+Mathematisch Centrum (CWI, see http://www.cwi.nl) in the Netherlands
+as a successor of a language called ABC.  Guido remains Python's
+principal author, although it includes many contributions from others.
+
+In 1995, Guido continued his work on Python at the Corporation for
+National Research Initiatives (CNRI, see http://www.cnri.reston.va.us)
+in Reston, Virginia where he released several versions of the
+software.
+
+In May 2000, Guido and the Python core development team moved to
+BeOpen.com to form the BeOpen PythonLabs team.  In October of the same
+year, the PythonLabs team moved to Digital Creations, which became
+Zope Corporation.  In 2001, the Python Software Foundation (PSF, see
+https://www.python.org/psf/) was formed, a non-profit organization
+created specifically to own Python-related Intellectual Property.
+Zope Corporation was a sponsoring member of the PSF.
+
+All Python releases are Open Source (see http://www.opensource.org for
+the Open Source Definition).  Historically, most, but not all, Python
+releases have also been GPL-compatible; the table below summarizes
+the various releases.
+
+    Release         Derived     Year        Owner       GPL-
+                    from                                compatible? (1)
+
+    0.9.0 thru 1.2              1991-1995   CWI         yes
+    1.3 thru 1.5.2  1.2         1995-1999   CNRI        yes
+    1.6             1.5.2       2000        CNRI        no
+    2.0             1.6         2000        BeOpen.com  no
+    1.6.1           1.6         2001        CNRI        yes (2)
+    2.1             2.0+1.6.1   2001        PSF         no
+    2.0.1           2.0+1.6.1   2001        PSF         yes
+    2.1.1           2.1+2.0.1   2001        PSF         yes
+    2.1.2           2.1.1       2002        PSF         yes
+    2.1.3           2.1.2       2002        PSF         yes
+    2.2 and above   2.1.1       2001-now    PSF         yes
+
+Footnotes:
+
+(1) GPL-compatible doesn't mean that we're distributing Python under
+    the GPL.  All Python licenses, unlike the GPL, let you distribute
+    a modified version without making your changes open source.  The
+    GPL-compatible licenses make it possible to combine Python with
+    other software that is released under the GPL; the others don't.
+
+(2) According to Richard Stallman, 1.6.1 is not GPL-compatible,
+    because its license has a choice of law clause.  According to
+    CNRI, however, Stallman's lawyer has told CNRI's lawyer that 1.6.1
+    is "not incompatible" with the GPL.
+
+Thanks to the many outside volunteers who have worked under Guido's
+direction to make these releases possible.
+
+
+B. TERMS AND CONDITIONS FOR ACCESSING OR OTHERWISE USING PYTHON
+===============================================================
+
+PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
+--------------------------------------------
+
+1. This LICENSE AGREEMENT is between the Python Software Foundation
+("PSF"), and the Individual or Organization ("Licensee") accessing and
+otherwise using this software ("Python") in source or binary form and
+its associated documentation.
+
+2. Subject to the terms and conditions of this License Agreement, PSF hereby
+grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
+analyze, test, perform and/or display publicly, prepare derivative works,
+distribute, and otherwise use Python alone or in any derivative version,
+provided, however, that PSF's License Agreement and PSF's notice of copyright,
+i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018 Python Software Foundation; All
+Rights Reserved" are retained in Python alone or in any derivative version
+prepared by Licensee.
+
+3. In the event Licensee prepares a derivative work that is based on
+or incorporates Python or any part thereof, and wants to make
+the derivative work available to others as provided herein, then
+Licensee hereby agrees to include in any such work a brief summary of
+the changes made to Python.
+
+4. PSF is making Python available to Licensee on an "AS IS"
+basis.  PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
+FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
+A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
+OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+6. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+7. Nothing in this License Agreement shall be deemed to create any
+relationship of agency, partnership, or joint venture between PSF and
+Licensee.  This License Agreement does not grant permission to use PSF
+trademarks or trade name in a trademark sense to endorse or promote
+products or services of Licensee, or any third party.
+
+8. By copying, installing or otherwise using Python, Licensee
+agrees to be bound by the terms and conditions of this License
+Agreement.
+
+
+BEOPEN.COM LICENSE AGREEMENT FOR PYTHON 2.0
+-------------------------------------------
+
+BEOPEN PYTHON OPEN SOURCE LICENSE AGREEMENT VERSION 1
+
+1. This LICENSE AGREEMENT is between BeOpen.com ("BeOpen"), having an
+office at 160 Saratoga Avenue, Santa Clara, CA 95051, and the
+Individual or Organization ("Licensee") accessing and otherwise using
+this software in source or binary form and its associated
+documentation ("the Software").
+
+2. Subject to the terms and conditions of this BeOpen Python License
+Agreement, BeOpen hereby grants Licensee a non-exclusive,
+royalty-free, world-wide license to reproduce, analyze, test, perform
+and/or display publicly, prepare derivative works, distribute, and
+otherwise use the Software alone or in any derivative version,
+provided, however, that the BeOpen Python License is retained in the
+Software, alone or in any derivative version prepared by Licensee.
+
+3. BeOpen is making the Software available to Licensee on an "AS IS"
+basis.  BEOPEN MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, BEOPEN MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF THE SOFTWARE WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+4. BEOPEN SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF THE
+SOFTWARE FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS
+AS A RESULT OF USING, MODIFYING OR DISTRIBUTING THE SOFTWARE, OR ANY
+DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+5. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+6. This License Agreement shall be governed by and interpreted in all
+respects by the law of the State of California, excluding conflict of
+law provisions.  Nothing in this License Agreement shall be deemed to
+create any relationship of agency, partnership, or joint venture
+between BeOpen and Licensee.  This License Agreement does not grant
+permission to use BeOpen trademarks or trade names in a trademark
+sense to endorse or promote products or services of Licensee, or any
+third party.  As an exception, the "BeOpen Python" logos available at
+http://www.pythonlabs.com/logos.html may be used according to the
+permissions granted on that web page.
+
+7. By copying, installing or otherwise using the software, Licensee
+agrees to be bound by the terms and conditions of this License
+Agreement.
+
+
+CNRI LICENSE AGREEMENT FOR PYTHON 1.6.1
+---------------------------------------
+
+1. This LICENSE AGREEMENT is between the Corporation for National
+Research Initiatives, having an office at 1895 Preston White Drive,
+Reston, VA 20191 ("CNRI"), and the Individual or Organization
+("Licensee") accessing and otherwise using Python 1.6.1 software in
+source or binary form and its associated documentation.
+
+2. Subject to the terms and conditions of this License Agreement, CNRI
+hereby grants Licensee a nonexclusive, royalty-free, world-wide
+license to reproduce, analyze, test, perform and/or display publicly,
+prepare derivative works, distribute, and otherwise use Python 1.6.1
+alone or in any derivative version, provided, however, that CNRI's
+License Agreement and CNRI's notice of copyright, i.e., "Copyright (c)
+1995-2001 Corporation for National Research Initiatives; All Rights
+Reserved" are retained in Python 1.6.1 alone or in any derivative
+version prepared by Licensee.  Alternately, in lieu of CNRI's License
+Agreement, Licensee may substitute the following text (omitting the
+quotes): "Python 1.6.1 is made available subject to the terms and
+conditions in CNRI's License Agreement.  This Agreement together with
+Python 1.6.1 may be located on the Internet using the following
+unique, persistent identifier (known as a handle): 1895.22/1013.  This
+Agreement may also be obtained from a proxy server on the Internet
+using the following URL: http://hdl.handle.net/1895.22/1013".
+
+3. In the event Licensee prepares a derivative work that is based on
+or incorporates Python 1.6.1 or any part thereof, and wants to make
+the derivative work available to others as provided herein, then
+Licensee hereby agrees to include in any such work a brief summary of
+the changes made to Python 1.6.1.
+
+4. CNRI is making Python 1.6.1 available to Licensee on an "AS IS"
+basis.  CNRI MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, CNRI MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON 1.6.1 WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+5. CNRI SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
+1.6.1 FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
+A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 1.6.1,
+OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+6. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+7. This License Agreement shall be governed by the federal
+intellectual property law of the United States, including without
+limitation the federal copyright law, and, to the extent such
+U.S. federal law does not apply, by the law of the Commonwealth of
+Virginia, excluding Virginia's conflict of law provisions.
+Notwithstanding the foregoing, with regard to derivative works based
+on Python 1.6.1 that incorporate non-separable material that was
+previously distributed under the GNU General Public License (GPL), the
+law of the Commonwealth of Virginia shall govern this License
+Agreement only as to issues arising under or with respect to
+Paragraphs 4, 5, and 7 of this License Agreement.  Nothing in this
+License Agreement shall be deemed to create any relationship of
+agency, partnership, or joint venture between CNRI and Licensee.  This
+License Agreement does not grant permission to use CNRI trademarks or
+trade name in a trademark sense to endorse or promote products or
+services of Licensee, or any third party.
+
+8. By clicking on the "ACCEPT" button where indicated, or by copying,
+installing or otherwise using Python 1.6.1, Licensee agrees to be
+bound by the terms and conditions of this License Agreement.
+
+        ACCEPT
+
+
+CWI LICENSE AGREEMENT FOR PYTHON 0.9.0 THROUGH 1.2
+--------------------------------------------------
+
+Copyright (c) 1991 - 1995, Stichting Mathematisch Centrum Amsterdam,
+The Netherlands.  All rights reserved.
+
+Permission to use, copy, modify, and distribute this software and its
+documentation for any purpose and without fee is hereby granted,
+provided that the above copyright notice appear in all copies and that
+both that copyright notice and this permission notice appear in
+supporting documentation, and that the name of Stichting Mathematisch
+Centrum or CWI not be used in advertising or publicity pertaining to
+distribution of the software without specific, written prior
+permission.
+
+STICHTING MATHEMATISCH CENTRUM DISCLAIMS ALL WARRANTIES WITH REGARD TO
+THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
+FITNESS, IN NO EVENT SHALL STICHTING MATHEMATISCH CENTRUM BE LIABLE
+FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index acc083b..b33bbc0 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -19,6 +19,7 @@
 
 import com.google.api.services.bigquery.model.TableRow;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
@@ -76,7 +77,13 @@
     @ProcessElement
     public void processElement(ProcessContext c) {
       TableRow row = c.element();
-      int timestamp = (Integer) row.get("timestamp");
+      int timestamp;
+      // TODO(BEAM-5390): Avoid this workaround.
+      try {
+        timestamp = ((BigDecimal) row.get("timestamp")).intValue();
+      } catch (ClassCastException e) {
+        timestamp = ((Integer) row.get("timestamp")).intValue();
+      }
       String userName = (String) row.get("contributor_username");
       if (userName != null) {
         // Sets the implicit timestamp field to be used in windowing.
@@ -180,9 +187,9 @@
   public interface Options extends PipelineOptions {
     @Description("Input specified as a GCS path containing a BigQuery table exported as json")
     @Default.String(EXPORTED_WIKI_TABLE)
-    String getInput();
+    String getWikiInput();
 
-    void setInput(String value);
+    void setWikiInput(String value);
 
     @Description("File to output results to")
     @Validation.Required
@@ -191,18 +198,21 @@
     void setOutput(String value);
   }
 
-  public static void main(String[] args) {
-    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
-
+  public static void run(Options options) {
     Pipeline p = Pipeline.create(options);
 
     double samplingThreshold = 0.1;
 
-    p.apply(TextIO.read().from(options.getInput()))
+    p.apply(TextIO.read().from(options.getWikiInput()))
         .apply(MapElements.via(new ParseTableRowJson()))
         .apply(new ComputeTopSessions(samplingThreshold))
-        .apply("Write", TextIO.write().withoutSharding().to(options.getOutput()));
+        .apply("Write", TextIO.write().to(options.getOutput()));
 
     p.run().waitUntilFinish();
   }
+
+  public static void main(String[] args) {
+    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
+    run(options);
+  }
 }
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsIT.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsIT.java
new file mode 100644
index 0000000..1f278e7
--- /dev/null
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsIT.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.examples.complete;
+
+import java.util.Date;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.FileChecksumMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** End-to-end tests of TopWikipediaSessions. */
+@RunWith(JUnit4.class)
+public class TopWikipediaSessionsIT {
+
+  private static final String DEFAULT_INPUT_10_FILES =
+      "gs://apache-beam-samples/wikipedia_edits/wiki_data-00000000000*.json";
+  private static final String DEFAULT_OUTPUT_CHECKSUM = "a7f0c50b895d0a2e37b78c3f94eadcfb11a647a6";
+
+  /** PipelineOptions for the TopWikipediaSessions integration test. */
+  public interface TopWikipediaSessionsITOptions
+      extends TestPipelineOptions, TopWikipediaSessions.Options {}
+
+  @BeforeClass
+  public static void setUp() {
+    PipelineOptionsFactory.register(TestPipelineOptions.class);
+  }
+
+  @Test
+  public void testE2ETopWikiPages() throws Exception {
+    TopWikipediaSessionsITOptions options =
+        TestPipeline.testingPipelineOptions().as(TopWikipediaSessionsITOptions.class);
+
+    options.setWikiInput(DEFAULT_INPUT_10_FILES);
+    options.setOutput(
+        FileSystems.matchNewResource(options.getTempRoot(), true)
+            .resolve(
+                String.format("topwikisessions-it-%tF-%<tH-%<tM-%<tS-%<tL", new Date()),
+                StandardResolveOptions.RESOLVE_DIRECTORY)
+            .resolve("output", StandardResolveOptions.RESOLVE_DIRECTORY)
+            .resolve("results", StandardResolveOptions.RESOLVE_FILE)
+            .toString());
+    options.setOnSuccessMatcher(
+        new FileChecksumMatcher(DEFAULT_OUTPUT_CHECKSUM, options.getOutput() + "*-of-*"));
+
+    TopWikipediaSessions.run(options);
+  }
+}
diff --git a/release/src/main/scripts/build_release_candidate.sh b/release/src/main/scripts/build_release_candidate.sh
index 0c89016..37941d7 100755
--- a/release/src/main/scripts/build_release_candidate.sh
+++ b/release/src/main/scripts/build_release_candidate.sh
@@ -81,6 +81,9 @@
   echo "============Building and Staging Java Artifacts============="
   echo "--------Cloning Beam Repo and Checkout Release Branch-------"
   cd ~
+  if [[ -d ${LOCAL_CLONE_DIR} ]]; then
+    rm -rf ${LOCAL_CLONE_DIR}
+  fi
   mkdir ${LOCAL_CLONE_DIR}
   cd ${LOCAL_CLONE_DIR}
   git clone ${GIT_REPO_URL}
@@ -110,6 +113,9 @@
 if [[ $confirmation = "y" ]]; then
   echo "=========Staging Source Release on dist.apache.org==========="
   cd ~
+  if [[ -d ${LOCAL_JAVA_STAGING_DIR} ]]; then
+    rm -rf ${LOCAL_JAVA_STAGING_DIR}
+  fi
   mkdir ${LOCAL_JAVA_STAGING_DIR}
   cd ${LOCAL_JAVA_STAGING_DIR}
   svn co ${ROOT_SVN_URL}
@@ -135,7 +141,7 @@
     rm -rf ~/${LOCAL_JAVA_STAGING_DIR}
     exit
   fi
-  svn commit
+  svn commit --no-auth-cache
   rm -rf ~/${LOCAL_JAVA_STAGING_DIR}
 fi
 
@@ -145,6 +151,9 @@
 if [[ $confirmation = "y" ]]; then
   echo "============Staging Python Binaries on dist.apache.org========="
   cd ~
+  if [[ -d ${LOCAL_PYTHON_STAGING_DIR} ]]; then
+    rm -rf ${LOCAL_PYTHON_STAGING_DIR}
+  fi
   mkdir ${LOCAL_PYTHON_STAGING_DIR}
   cd ${LOCAL_PYTHON_STAGING_DIR}
 
@@ -179,7 +188,7 @@
     rm -rf ~/${PYTHON_ARTIFACTS_DIR}
     exit
   fi
-  svn commit
+  svn commit --no-auth-cache
   rm -rf ~/${PYTHON_ARTIFACTS_DIR}
 fi
 
@@ -189,6 +198,9 @@
 if [[ $confirmation = "y" ]]; then
   echo "==============Creating PR for Updating Website==============="
   cd ~
+  if [[ -d ${LOCAL_WEBSITE_UPDATE_DIR} ]]; then
+    rm -rf ${LOCAL_WEBSITE_UPDATE_DIR}
+  fi
   mkdir ${LOCAL_WEBSITE_UPDATE_DIR}
   cd ${LOCAL_WEBSITE_UPDATE_DIR}
   mkdir ${LOCAL_PYTHON_DOC}
diff --git a/release/src/main/scripts/cut_release_branch.sh b/release/src/main/scripts/cut_release_branch.sh
index 1477b64..404a6f6 100755
--- a/release/src/main/scripts/cut_release_branch.sh
+++ b/release/src/main/scripts/cut_release_branch.sh
@@ -72,6 +72,9 @@
 echo "==============================================================="
 
 cd ~
+if [[ -d ${LOCAL_CLONE_DIR} ]]; then
+  rm -rf ${LOCAL_CLONE_DIR}
+fi
 mkdir ${LOCAL_CLONE_DIR}
 cd ${LOCAL_CLONE_DIR}
 git clone ${GITHUB_REPO_URL}
diff --git a/release/src/main/scripts/preparation_before_release.sh b/release/src/main/scripts/preparation_before_release.sh
index 4c3d5a9..1d02976 100755
--- a/release/src/main/scripts/preparation_before_release.sh
+++ b/release/src/main/scripts/preparation_before_release.sh
@@ -62,6 +62,9 @@
     echo "Please input your name: "
     read name
     echo "======Starting updating KEYS file in dev repo===="
+    if [[ -d ${LOCAL_SVN_DIR} ]]; then
+      rm -rf ${LOCAL_SVN_DIR}
+    fi
     mkdir ${LOCAL_SVN_DIR}
     cd ${LOCAL_SVN_DIR}
     svn co ${ROOT_SVN_URL}/${DEV_REPO}/${BEAM_REPO}
@@ -71,7 +74,7 @@
     echo "Please review all changes. Do you confirm to commit? [y|N]"
     read commit_confirmation
     if [[ $commit_confirmation = "y" ]]; then
-      svn commit KEYS
+      svn commit --no-auth-cache KEYS
     else
       echo "Not commit new changes into ${ROOT_SVN_URL}/${DEV_REPO}/${BEAM_REPO}${DEV_REPO}/KEYS"
     fi
@@ -85,7 +88,7 @@
     echo "Please review all changes. Do you confirm to commit? [y|N]"
     read commit_confirmation
     if [[ $commit_confirmation = "y" ]]; then
-      svn commit KEYS
+      svn commit --no-auth-cache KEYS
     else
       echo "Not commit new changes into ${ROOT_SVN_URL}/${DEV_REPO}/${BEAM_REPO}${RELEASE_REPO}/KEYS"
     fi
diff --git a/release/src/main/scripts/run_rc_validation.sh b/release/src/main/scripts/run_rc_validation.sh
index 107c37a..4881066 100755
--- a/release/src/main/scripts/run_rc_validation.sh
+++ b/release/src/main/scripts/run_rc_validation.sh
@@ -24,13 +24,13 @@
   echo "Please sign up your name in the tests you have ran."
 
   echo "===========================Final Cleanup==========================="
-  if [[ ! -z `ls -a ~/.m2/settings_backup.xml` ]]; then
+  if [[ -f ~/.m2/settings_backup.xml ]]; then
     rm ~/.m2/settings.xml
     cp ~/.m2/settings_backup.xml ~/.m2/settings.xml
     echo "* Restored ~/.m2/settings.xml"
   fi
 
-  if [[ ! -z `ls -a ~/.bashrc_backup` ]]; then
+  if [[ -f ~/.bashrc_backup ]]; then
     rm ~/.bashrc
     cp ~/.bashrc_backup ~/.bashrc
     echo "* Restored ~/.bashrc"
@@ -72,6 +72,10 @@
 
 echo "====================Cloning Beam Release Branch===================="
 cd ~
+if [[ -d ${LOCAL_CLONE_DIR} ]]; then
+  rm -rf ${LOCAL_CLONE_DIR}
+fi
+
 mkdir ${LOCAL_CLONE_DIR}
 cd ${LOCAL_CLONE_DIR}
 git clone ${GIT_REPO_URL}
@@ -179,7 +183,7 @@
   echo "Please review following GCP sources setup: "
   echo "Using GCP project: ${USER_GCP_PROJECT}"
   echo "Will create BigQuery dataset: ${MOBILE_GAME_DATASET}"
-  echo "Will create Pubsub topic: ${MOBILE_GMAE_PUBSUB_TOPIC}"
+  echo "Will create Pubsub topic: ${MOBILE_GAME_PUBSUB_TOPIC}"
   echo "[Confirmation Required] Do you want to run validations with configurations above? [y|N]"
   read confirmation
   if [[ $confirmation = "y" ]]; then
@@ -223,13 +227,15 @@
 if [[ $confirmation = "y" ]]; then
   echo "[Input Required] Please enter your github repo URL forked from apache/beam:"
   read USER_REMOTE_URL
-  WORKING_BRANCH=python_validatoin_pr
+  echo "[Input Required] Please enter your github username:"
+  read GITHUB_USERNAME
+  WORKING_BRANCH=python_validation_pr
   git checkout -b ${WORKING_BRANCH}
   touch empty_file.txt
   git add empty_file.txt
   git commit -m "Add empty file in order to create PR"
   git push -f ${USER_REMOTE_URL}
-  hub pull-request -b apache:${RELEASE_BRANCH} -h boyuanzz:${WORKING_BRANCH} -F- <<<"[DO NOT MERGE]Run Python RC Validation Tests
+  hub pull-request -b apache:${RELEASE_BRANCH} -h ${GITHUB_USERNAME}:${WORKING_BRANCH} -F- <<<"[DO NOT MERGE]Run Python RC Validation Tests
 
 
   Run Python ReleaseCandidate"
@@ -310,11 +316,11 @@
 
   echo "--------------------------Updating ~/.m2/settings.xml-------------------------"
   cd ~
-  if [[ -z `ls -a ~ | grep ".m2"` ]]; then
+  if [[ -d .m2 ]]; then
     mkdir .m2
   fi
   cd .m2
-  if [[ ! -z `ls -a ~/.m2/ | grep "settings.xml"` ]]; then
+  if [[ -f ~/.m2/settings.xml ]]; then
     mv settings.xml settings_backup.xml
   fi
   touch settings.xml
diff --git a/release/src/main/scripts/sign_hash_python_wheels.sh b/release/src/main/scripts/sign_hash_python_wheels.sh
new file mode 100755
index 0000000..5168189
--- /dev/null
+++ b/release/src/main/scripts/sign_hash_python_wheels.sh
@@ -0,0 +1,56 @@
+#!/bin/bash
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+#
+
+# This script will sign and hash python wheels.
+set -e
+
+BEAM_SVN_DIR=https://dist.apache.org/repos/dist/dev/beam
+VERSION=
+PYTHON_ARTIFACTS_DIR=python
+
+echo "===============================Pre-requirements========================"
+echo "Please make sure you have built python wheels."
+echo "Please make sure you have configured and started your gpg by running ./preparation_before_release.sh."
+echo "Do you want to proceed? [y|N]"
+read confirmation
+if [[ $confirmation != "y" ]]; then
+  echo "Please follow the release guide to build python wheels first."
+  exit
+fi
+
+echo "[Input Required] Please enter the release version:"
+read VERSION
+
+cd ~
+if [[ -d ${VERSION} ]]; then
+  rm -rf ${VERSION}
+fi
+
+svn co ${BEAM_SVN_DIR}/${VERSION}
+cd ${VERSION}/${PYTHON_ARTIFACTS_DIR}
+echo "Start signing and hashing python wheels artifacts"
+rm *.whl.asc || true
+rm *.whl.sha512 ||true
+for artifact in *.whl; do
+  gpg --armor --detach-sig $artifact
+  sha512sum $artifact > ${artifact}.sha512
+done
+svn add --force *
+svn commit --no-auth-cache
+
+rm -rf ~/${VERSION}
\ No newline at end of file
diff --git a/release/src/main/scripts/start_snapshot_build.sh b/release/src/main/scripts/start_snapshot_build.sh
index f425f43..d9b35f9 100755
--- a/release/src/main/scripts/start_snapshot_build.sh
+++ b/release/src/main/scripts/start_snapshot_build.sh
@@ -58,6 +58,10 @@
 hub version
 
 echo "===============Starting creating empty PR==============="
+cd ~
+if [[ -d ${LOCAL_BEAM_DIR} ]]; then
+  rm -rf ${LOCAL_BEAM_DIR}
+fi
 mkdir ${LOCAL_BEAM_DIR}
 cd ${LOCAL_BEAM_DIR}
 git clone ${GIT_REPO_URL}
diff --git a/release/src/main/scripts/verify_release_build.sh b/release/src/main/scripts/verify_release_build.sh
index dd0ab74..b284950 100755
--- a/release/src/main/scripts/verify_release_build.sh
+++ b/release/src/main/scripts/verify_release_build.sh
@@ -103,6 +103,9 @@
 fi
 
 echo "======================Starting Clone Repo======================"
+if [[ -d ${LOCAL_CLONE_DIR} ]]; then
+  rm -rf ${LOCAL_CLONE_DIR}
+fi
 mkdir ${LOCAL_CLONE_DIR}
 cd  ${LOCAL_CLONE_DIR}
 git clone ${GIT_REPO_URL}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
index b47a40a..6a8988e 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslation.java
@@ -88,4 +88,16 @@
         MAPPER.writeValueAsString(ImmutableMap.of("options", mapWithoutUrns)),
         PipelineOptions.class);
   }
+
+  /** Converts the provided Json{@link String} into {@link PipelineOptions}. */
+  public static PipelineOptions fromJson(String optionsJson) throws IOException {
+    Struct.Builder builder = Struct.newBuilder();
+    JsonFormat.parser().merge(optionsJson, builder);
+    return fromProto(builder.build());
+  }
+
+  /** Converts the provided {@link PipelineOptions} into Json{@link String}. */
+  public static String toJson(PipelineOptions options) throws IOException {
+    return JsonFormat.printer().print(toProto(options));
+  }
 }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java
index 4d83239..aef82d2 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineOptionsTranslationTest.java
@@ -82,6 +82,17 @@
       Struct reserializedStruct = PipelineOptionsTranslation.toProto(deserializedStruct);
       assertThat(reserializedStruct.getFieldsMap(), equalTo(originalStruct.getFieldsMap()));
     }
+
+    @Test
+    public void testToFromJson() throws Exception {
+      options.getOptionsId();
+      Struct originalStruct = PipelineOptionsTranslation.toProto(options);
+      String json = PipelineOptionsTranslation.toJson(options);
+
+      PipelineOptions deserializedOptions = PipelineOptionsTranslation.fromJson(json);
+      Struct reserializedStruct = PipelineOptionsTranslation.toProto(deserializedOptions);
+      assertThat(reserializedStruct.getFieldsMap(), equalTo(originalStruct.getFieldsMap()));
+    }
   }
 
   /** Tests that translations contain the correct contents. */
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 2287b1a..54c39f0 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -61,6 +61,7 @@
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
@@ -769,7 +770,28 @@
               dataflowOptions.getPathValidator().verifyPath(options.getGcpTempLocation()));
     }
     newJob.getEnvironment().setDataset(options.getTempDatasetId());
-    newJob.getEnvironment().setExperiments(options.getExperiments());
+
+    // Represent the minCpuPlatform pipeline option as an experiment, if not already present.
+    List<String> experiments =
+        firstNonNull(dataflowOptions.getExperiments(), new ArrayList<String>());
+    if (!isNullOrEmpty(dataflowOptions.getMinCpuPlatform())) {
+
+      List<String> minCpuFlags =
+          experiments
+              .stream()
+              .filter(p -> p.startsWith("min_cpu_platform"))
+              .collect(Collectors.toList());
+
+      if (minCpuFlags.isEmpty()) {
+        experiments.add("min_cpu_platform=" + dataflowOptions.getMinCpuPlatform());
+      } else {
+        LOG.warn(
+            "Flag min_cpu_platform is defined in both top level PipelineOption, "
+                + "as well as under experiments. Proceed using {}.",
+            minCpuFlags.get(0));
+      }
+    }
+    newJob.getEnvironment().setExperiments(experiments);
 
     // Set the Docker container image that executes Dataflow worker harness, residing in Google
     // Container Registry. Translator is guaranteed to create a worker pool prior to this point.
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
index 3570eca..3f70ec1 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineWorkerPoolOptions.java
@@ -241,4 +241,17 @@
   Boolean getUsePublicIps();
 
   void setUsePublicIps(@Nullable Boolean value);
+
+  /**
+   * Specifies a Minimum CPU platform for VM instances.
+   *
+   * <p>More details see <a
+   * href='https://cloud.google.com/compute/docs/instances/specify-min-cpu-platform'>Specifying
+   * Pipeline Execution Parameters</a>.
+   */
+  @Description("GCE minimum CPU platform. Default is determined by GCP.")
+  @Nullable
+  String getMinCpuPlatform();
+
+  void setMinCpuPlatform(String minCpuPlatform);
 }
diff --git a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
index 0318600..155977c 100644
--- a/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
+++ b/runners/reference/java/src/main/java/org/apache/beam/runners/reference/JobServicePipelineResult.java
@@ -35,7 +35,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class JobServicePipelineResult implements PipelineResult {
+class JobServicePipelineResult implements PipelineResult, AutoCloseable {
 
   private static final long POLL_INTERVAL_MS = 10 * 1000;
 
@@ -43,14 +43,19 @@
 
   private final ByteString jobId;
   private final CloseableResource<JobServiceBlockingStub> jobService;
+  @Nullable private State terminationState;
 
   JobServicePipelineResult(ByteString jobId, CloseableResource<JobServiceBlockingStub> jobService) {
     this.jobId = jobId;
     this.jobService = jobService;
+    this.terminationState = null;
   }
 
   @Override
   public State getState() {
+    if (terminationState != null) {
+      return terminationState;
+    }
     JobServiceBlockingStub stub = jobService.get();
     GetJobStateResponse response =
         stub.getState(GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build());
@@ -89,6 +94,9 @@
 
   @Override
   public State waitUntilFinish() {
+    if (terminationState != null) {
+      return terminationState;
+    }
     JobServiceBlockingStub stub = jobService.get();
     GetJobStateRequest request = GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build();
     GetJobStateResponse response = stub.getState(request);
@@ -103,11 +111,8 @@
       response = stub.getState(request);
       lastState = getJavaState(response.getState());
     }
-    try {
-      jobService.close();
-    } catch (Exception e) {
-      LOG.warn("Error cleaning up job service", e);
-    }
+    close();
+    terminationState = lastState;
     return lastState;
   }
 
@@ -116,6 +121,14 @@
     throw new UnsupportedOperationException("Not yet implemented.");
   }
 
+  @Override
+  public void close() {
+    try (CloseableResource<JobServiceBlockingStub> jobService = this.jobService) {
+    } catch (Exception e) {
+      LOG.warn("Error cleaning up job service", e);
+    }
+  }
+
   private static State getJavaState(JobApi.JobState.Enum protoState) {
     switch (protoState) {
       case UNSPECIFIED:
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 2c65f94..20314fc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -254,10 +254,26 @@
    *
    * <pre>{@code
    * public class AverageFn extends CombineFn<Integer, AverageFn.Accum, Double> {
-   *   public static class Accum {
+   *   public static class Accum implements Serializable {
    *     int sum = 0;
    *     int count = 0;
+   *
+   *    {@literal@}Override
+   *     public boolean equals(Object other) {
+   *       if (other == null) return false;
+   *       if (other == this) return true;
+   *       if (!(other instanceof Accum))return false;
+   *
+   *
+   *       Accum o = (Accum)other;
+   *       if (this.sum != o.sum || this.count != o.count) {
+   *         return false;
+   *       } else {
+   *         return true;
+   *       }
+   *     }
    *   }
+   *
    *   public Accum createAccumulator() {
    *     return new Accum();
    *   }
@@ -289,6 +305,24 @@
    * arbitrary tree structure. Commutativity is required because any order of the input values is
    * ignored when breaking up input values into groups.
    *
+   * <h3>Note on Data Encoding</h3>
+   *
+   * <p>Some form of data encoding is required when using custom types in a CombineFn which do not
+   * have well-known coders. The sample code above uses a custom Accumulator which gets coder by
+   * implementing {@link java.io.Serializable}. By doing this, we are relying on the generic {@link
+   * org.apache.beam.sdk.coders.CoderProvider}, which is able to provide a coder for any {@link
+   * java.io.Serializable} if applicable. In cases where {@link java.io.Serializable} is not
+   * efficient, or inapplicable, in general there are two alternatives for encoding:
+   *
+   * <ul>
+   *   <li>Default {@link org.apache.beam.sdk.coders.CoderRegistry}. For example, implement a coder
+   *       class explicitly and use the {@code @DefaultCoder} tag. See the {@link
+   *       org.apache.beam.sdk.coders.CoderRegistry} for the numerous ways in which to bind a type
+   *       to a coder.
+   *   <li>CombineFn specific way. While extending CombineFn, overwrite both {@link
+   *       #getAccumulatorCoder} and {@link #getDefaultOutputCoder}.
+   * </ul>
+   *
    * @param <InputT> type of input values
    * @param <AccumT> type of mutable accumulator values
    * @param <OutputT> type of output values
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index 687d855..61342c4 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -39,15 +39,12 @@
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironments;
-import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlInputRefExpression;
 import org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.UdafImpl;
 import org.apache.beam.sdk.extensions.sql.impl.transform.agg.CovarianceFn;
 import org.apache.beam.sdk.extensions.sql.impl.transform.agg.VarianceFn;
 import org.apache.beam.sdk.extensions.sql.impl.utils.BigDecimalConverter;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -153,22 +150,10 @@
           int refIndexKey = call.getArgList().get(0);
           int refIndexValue = call.getArgList().get(1);
 
-          FieldType keyDescriptor = sourceSchema.getField(refIndexKey).getType();
-          BeamSqlInputRefExpression sourceExpKey =
-              new BeamSqlInputRefExpression(CalciteUtils.toSqlTypeName(keyDescriptor), refIndexKey);
-
-          FieldType valueDescriptor = sourceSchema.getField(refIndexValue).getType();
-          BeamSqlInputRefExpression sourceExpValue =
-              new BeamSqlInputRefExpression(
-                  CalciteUtils.toSqlTypeName(valueDescriptor), refIndexValue);
-
-          sourceFieldExps.add(KV.of(sourceExpKey, sourceExpValue));
+          sourceFieldExps.add(KV.of(refIndexKey, refIndexValue));
         } else {
-          int refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0;
-          FieldType fieldType = sourceSchema.getField(refIndex).getType();
-          BeamSqlInputRefExpression sourceExp =
-              new BeamSqlInputRefExpression(CalciteUtils.toSqlTypeName(fieldType), refIndex);
-          sourceFieldExps.add(sourceExp);
+          Integer refIndex = call.getArgList().size() > 0 ? call.getArgList().get(0) : 0;
+          sourceFieldExps.add(refIndex);
         }
 
         Schema.Field field = CalciteUtils.toField(aggName, call.type);
@@ -243,10 +228,8 @@
         CombineFn aggregator = aggregators.get(idx);
         Object element = accumulator.accumulatorElements.get(idx);
 
-        if (sourceFieldExps.get(idx) instanceof BeamSqlInputRefExpression) {
-          BeamSqlInputRefExpression exp = (BeamSqlInputRefExpression) sourceFieldExps.get(idx);
-          Object value =
-              exp.evaluate(input, null, BeamSqlExpressionEnvironments.empty()).getValue();
+        if (sourceFieldExps.get(idx) instanceof Integer) {
+          Object value = input.getValue((Integer) sourceFieldExps.get(idx));
 
           // every aggregator ignores null values, e.g., COUNT(NULL) is always zero
           if (value != null) {
@@ -260,16 +243,11 @@
            * If source expression is type of KV pair, we bundle the value of two expressions into KV
            * pair and pass it to aggregator's addInput method.
            */
-          KV<BeamSqlInputRefExpression, BeamSqlInputRefExpression> exp =
-              (KV<BeamSqlInputRefExpression, BeamSqlInputRefExpression>) sourceFieldExps.get(idx);
+          KV<Integer, Integer> exp = (KV<Integer, Integer>) sourceFieldExps.get(idx);
 
-          Object key =
-              exp.getKey().evaluate(input, null, BeamSqlExpressionEnvironments.empty()).getValue();
+          Object key = input.getValue(exp.getKey());
 
-          Object value =
-              exp.getValue()
-                  .evaluate(input, null, BeamSqlExpressionEnvironments.empty())
-                  .getValue();
+          Object value = input.getValue(exp.getValue());
 
           // ignore aggregator if either key or value is null, e.g., COVAR_SAMP(x, NULL) is null
           if (key != null && value != null) {
@@ -313,20 +291,18 @@
       registry.registerCoderForClass(BigDecimal.class, BigDecimalCoder.of());
       List<Coder> aggAccuCoderList = new ArrayList<>();
       for (int idx = 0; idx < aggregators.size(); ++idx) {
-        if (sourceFieldExps.get(idx) instanceof BeamSqlInputRefExpression) {
-          BeamSqlInputRefExpression exp = (BeamSqlInputRefExpression) sourceFieldExps.get(idx);
-          int srcFieldIndex = exp.getInputRef();
+        if (sourceFieldExps.get(idx) instanceof Integer) {
+          int srcFieldIndex = (Integer) sourceFieldExps.get(idx);
           Coder srcFieldCoder =
               RowCoder.coderForPrimitiveType(
                   sourceSchema.getField(srcFieldIndex).getType().getTypeName());
           aggAccuCoderList.add(aggregators.get(idx).getAccumulatorCoder(registry, srcFieldCoder));
         } else if (sourceFieldExps.get(idx) instanceof KV) {
           // extract coder of two expressions separately.
-          KV<BeamSqlInputRefExpression, BeamSqlInputRefExpression> exp =
-              (KV<BeamSqlInputRefExpression, BeamSqlInputRefExpression>) sourceFieldExps.get(idx);
+          KV<Integer, Integer> exp = (KV<Integer, Integer>) sourceFieldExps.get(idx);
 
-          int srcFieldIndexKey = exp.getKey().getInputRef();
-          int srcFieldIndexValue = exp.getValue().getInputRef();
+          int srcFieldIndexKey = exp.getKey();
+          int srcFieldIndexValue = exp.getValue();
 
           Coder srcFieldCoderKey =
               RowCoder.coderForPrimitiveType(
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
index 6ba55bc..d1eb02d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnHarness.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.fn.harness;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
 import java.util.EnumMap;
 import java.util.List;
@@ -34,6 +33,7 @@
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse.Builder;
 import org.apache.beam.model.pipeline.v1.Endpoints;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
 import org.apache.beam.sdk.fn.IdGenerator;
 import org.apache.beam.sdk.fn.IdGenerators;
@@ -43,7 +43,6 @@
 import org.apache.beam.sdk.io.FileSystems;
 import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.TextFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -88,11 +87,7 @@
     System.out.format("Pipeline options %s%n", System.getenv(PIPELINE_OPTIONS));
 
     String id = System.getenv(HARNESS_ID);
-    ObjectMapper objectMapper =
-        new ObjectMapper()
-            .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
-    PipelineOptions options =
-        objectMapper.readValue(System.getenv(PIPELINE_OPTIONS), PipelineOptions.class);
+    PipelineOptions options = PipelineOptionsTranslation.fromJson(System.getenv(PIPELINE_OPTIONS));
 
     Endpoints.ApiServiceDescriptor loggingApiServiceDescriptor =
         getApiServiceDescriptor(LOGGING_API_SERVICE_DESCRIPTOR);
diff --git a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
index 6cb5c93..bbb4ded 100644
--- a/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
+++ b/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java
@@ -136,8 +136,8 @@
  *      .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
  *        public void setParameters(KV<Integer, String> element, PreparedStatement query)
  *          throws SQLException {
- *          query.setInt(1, kv.getKey());
- *          query.setString(2, kv.getValue());
+ *          query.setInt(1, element.getKey());
+ *          query.setString(2, element.getValue());
  *        }
  *      })
  *    );
diff --git a/sdks/python/apache_beam/__init__.py b/sdks/python/apache_beam/__init__.py
index 569d6c4..ce8bdc2 100644
--- a/sdks/python/apache_beam/__init__.py
+++ b/sdks/python/apache_beam/__init__.py
@@ -79,7 +79,8 @@
 
 
 if not ((sys.version_info[0] == 2 and sys.version_info[1] == 7) or
-        (sys.version_info[0] == 3 and os.environ['BEAM_EXPERIMENTAL_PY3'])):
+        (sys.version_info[0] == 3 and
+         os.environ.get('BEAM_EXPERIMENTAL_PY3', False))):
   raise RuntimeError(
       'The Apache Beam SDK for Python is supported only on Python 2.7. '
       'It is not supported on Python ['+ str(sys.version_info) + '].')
diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py
index eb6f9a1..00349a6 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -439,6 +439,24 @@
     return 8
 
 
+class TimerCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees."""
+  def __init__(self, payload_coder_impl):
+    self._timestamp_coder_impl = TimestampCoderImpl()
+    self._payload_coder_impl = payload_coder_impl
+
+  def encode_to_stream(self, value, out, nested):
+    self._timestamp_coder_impl.encode_to_stream(value['timestamp'], out, True)
+    self._payload_coder_impl.encode_to_stream(value.get('payload'), out, True)
+
+  def decode_from_stream(self, in_stream, nested):
+    # TODO(robertwb): Consider using a concrete class rather than a dict here.
+    return dict(
+        timestamp=self._timestamp_coder_impl.decode_from_stream(
+            in_stream, True),
+        payload=self._payload_coder_impl.decode_from_stream(in_stream, True))
+
+
 small_ints = [chr(_).encode('latin-1') for _ in range(128)]
 
 
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index f0ed6dc..623ee0e 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -437,6 +437,34 @@
     return hash(type(self))
 
 
+class _TimerCoder(FastCoder):
+  """A coder used for timer values.
+
+  For internal use."""
+  def __init__(self, payload_coder):
+    self._payload_coder = payload_coder
+
+  def _get_component_coders(self):
+    return [self._payload_coder]
+
+  def _create_impl(self):
+    return coder_impl.TimerCoderImpl(self._payload_coder.get_impl())
+
+  def is_deterministic(self):
+    return self._payload_coder.is_deterministic()
+
+  def __eq__(self, other):
+    return (type(self) == type(other)
+            and self._payload_coder == other._payload_coder)
+
+  def __hash__(self):
+    return hash(type(self)) + hash(self._payload_coder)
+
+
+Coder.register_structured_urn(
+    common_urns.coders.TIMER.urn, _TimerCoder)
+
+
 class SingletonCoder(FastCoder):
   """A coder that always encodes exactly one value."""
 
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py
index 969c1de..607d3e2 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -197,6 +197,15 @@
         coders.TupleCoder((coders.TimestampCoder(), coders.BytesCoder())),
         (timestamp.Timestamp.of(27), b'abc'))
 
+  def test_timer_coder(self):
+    self.check_coder(coders._TimerCoder(coders.BytesCoder()),
+                     *[{'timestamp': timestamp.Timestamp(micros=x),
+                        'payload': b'xyz'}
+                       for x in range(-3, 3)])
+    self.check_coder(
+        coders.TupleCoder((coders._TimerCoder(coders.VarIntCoder()),)),
+        ({'timestamp': timestamp.Timestamp.of(37), 'payload': 389},))
+
   def test_tuple_coder(self):
     kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder()))
     # Verify cloud object representation
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 031406f..cb9d43b 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -63,7 +63,8 @@
       'beam:coder:iterable:v1': lambda t: coders.IterableCoder(t),
       'beam:coder:global_window:v1': coders.GlobalWindowCoder,
       'beam:coder:windowed_value:v1':
-          lambda v, w: coders.WindowedValueCoder(v, w)
+          lambda v, w: coders.WindowedValueCoder(v, w),
+      'beam:coder:timer:v1': coders._TimerCoder,
   }
 
   _urn_to_json_value_parser = {
@@ -81,7 +82,11 @@
       'beam:coder:windowed_value:v1':
           lambda x, value_parser, window_parser: windowed_value.create(
               value_parser(x['value']), x['timestamp'] * 1000,
-              tuple([window_parser(w) for w in x['windows']]))
+              tuple([window_parser(w) for w in x['windows']])),
+      'beam:coder:timer:v1':
+          lambda x, payload_parser: dict(
+              payload=payload_parser(x['payload']),
+              timestamp=Timestamp(micros=x['timestamp'])),
   }
 
   def test_standard_coders(self):
diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py
index ac19c51..4b761b3 100644
--- a/sdks/python/apache_beam/io/filesystem.py
+++ b/sdks/python/apache_beam/io/filesystem.py
@@ -26,7 +26,6 @@
 
 import abc
 import bz2
-import fnmatch
 import io
 import logging
 import os
@@ -531,24 +530,117 @@
     """
     raise NotImplementedError
 
+  @staticmethod
+  def _split_scheme(url_or_path):
+    match = re.match(r'(^[a-z]+)://(.*)', url_or_path)
+    if match is not None:
+      return match.groups()
+    return None, url_or_path
+
+  @staticmethod
+  def _combine_scheme(scheme, path):
+    if scheme is None:
+      return path
+    return '{}://{}'.format(scheme, path)
+
   def _url_dirname(self, url_or_path):
     """Like posixpath.dirname, but preserves scheme:// prefix.
 
     Args:
       url_or_path: A string in the form of scheme://some/path OR /some/path.
     """
-    match = re.match(r'([a-z]+://)(.*)', url_or_path)
-    if match is None:
-      return posixpath.dirname(url_or_path)
-    url_prefix, path = match.groups()
-    return url_prefix + posixpath.dirname(path)
+    scheme, path = self._split_scheme(url_or_path)
+    return self._combine_scheme(scheme, posixpath.dirname(path))
+
+  def match_files(self, file_metas, pattern):
+    """Filter :class:`FileMetadata` objects by *pattern*
+
+    Args:
+      file_metas (list of :class:`FileMetadata`):
+        Files to consider when matching
+      pattern (str): File pattern
+
+    See Also:
+      :meth:`translate_pattern`
+
+    Returns:
+      Generator of matching :class:`FileMetadata`
+    """
+    re_pattern = re.compile(self.translate_pattern(pattern))
+    match = re_pattern.match
+    for file_metadata in file_metas:
+      if match(file_metadata.path):
+        yield file_metadata
+
+  @staticmethod
+  def translate_pattern(pattern):
+    """
+    Translate a *pattern* to a regular expression.
+    There is no way to quote meta-characters.
+
+    Pattern syntax:
+      The pattern syntax is based on the fnmatch_ syntax, with the following
+      differences:
+
+      -   ``*`` Is equivalent to ``[^/\\]*`` rather than ``.*``.
+      -   ``**`` Is equivalent to ``.*``.
+
+    See also:
+      :meth:`match` uses this method
+
+    This method is based on `Python 2.7's fnmatch.translate`_.
+    The code in this method is licensed under
+    PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2.
+
+    .. _`fnmatch`: https://docs.python.org/2/library/fnmatch.html
+
+    .. _`Python 2.7's fnmatch.translate`: https://github.com/python/cpython\
+/blob/170ea8ccd4235d28538ab713041502d07ad1cacd/Lib/fnmatch.py#L85-L120
+    """
+    i, n = 0, len(pattern)
+    res = ''
+    while i < n:
+      c = pattern[i]
+      i = i + 1
+      if c == '*':
+        # One char lookahead for "**"
+        if i < n and pattern[i] == "*":
+          res = res + '.*'
+          i = i + 1
+        else:
+          res = res + r'[^/\\]*'
+      elif c == '?':
+        res = res + '.'
+      elif c == '[':
+        j = i
+        if j < n and pattern[j] == '!':
+          j = j + 1
+        if j < n and pattern[j] == ']':
+          j = j + 1
+        while j < n and pattern[j] != ']':
+          j = j + 1
+        if j >= n:
+          res = res + r'\['
+        else:
+          stuff = pattern[i:j].replace('\\', '\\\\')
+          i = j + 1
+          if stuff[0] == '!':
+            stuff = '^' + stuff[1:]
+          elif stuff[0] == '^':
+            stuff = '\\' + stuff
+          res = '%s[%s]' % (res, stuff)
+      else:
+        res = res + re.escape(c)
+
+    logger.debug('translate_pattern: %r -> %r', pattern, res)
+    return res + r'\Z(?ms)'
 
   def match(self, patterns, limits=None):
     """Find all matching paths to the patterns provided.
 
-    Pattern matching is done using fnmatch.fnmatch.
-    For filesystems that have directories, matching is not recursive. Patterns
-    like scheme://path/*/foo will not match anything.
+    See Also:
+      :meth:`translate_pattern`
+
     Patterns ending with '/' will be appended with '*'.
 
     Args:
@@ -583,15 +675,20 @@
           file_metadatas = [FileMetadata(pattern, self.size(pattern))]
       else:
         if self.has_dirs():
-          prefix_or_dir = self._url_dirname(prefix_or_dir)
+          prefix_dirname = self._url_dirname(prefix_or_dir)
+          if not prefix_dirname == prefix_or_dir:
+            logger.debug("Changed prefix_or_dir %r -> %r",
+                         prefix_or_dir, prefix_dirname)
+            prefix_or_dir = prefix_dirname
+
+        logger.debug("Listing files in %r", prefix_or_dir)
         file_metadatas = self._list(prefix_or_dir)
 
       metadata_list = []
-      for file_metadata in file_metadatas:
+      for file_metadata in self.match_files(file_metadatas, pattern):
         if limit is not None and len(metadata_list) >= limit:
           break
-        if fnmatch.fnmatch(file_metadata.path, pattern):
-          metadata_list.append(file_metadata)
+        metadata_list.append(file_metadata)
 
       return MatchResult(pattern, metadata_list)
 
diff --git a/sdks/python/apache_beam/io/filesystem_test.py b/sdks/python/apache_beam/io/filesystem_test.py
index 2954626..876ba7a 100644
--- a/sdks/python/apache_beam/io/filesystem_test.py
+++ b/sdks/python/apache_beam/io/filesystem_test.py
@@ -23,13 +23,17 @@
 import bz2
 import gzip
 import logging
+import ntpath
 import os
+import posixpath
 import tempfile
 import unittest
 from builtins import range
 from io import BytesIO
 
 from future.utils import iteritems
+from parameterized import param
+from parameterized import parameterized
 
 from apache_beam.io.filesystem import CompressedFile
 from apache_beam.io.filesystem import CompressionTypes
@@ -109,8 +113,61 @@
             for match_result in match_results
             for file_metadata in match_result.metadata_list]
 
-  def test_match_glob(self):
-    bucket_name = 'gcsio-test'
+  @parameterized.expand([
+      ('gs://gcsio-test/**', all),
+      # Does not match root-level files
+      ('gs://gcsio-test/**/*', lambda n, i: n not in ['cat.png']),
+      # Only matches root-level files
+      ('gs://gcsio-test/*', [
+          ('cat.png', 19)
+      ]),
+      ('gs://gcsio-test/cow/**', [
+          ('cow/cat/fish', 2),
+          ('cow/cat/blubber', 3),
+          ('cow/dog/blubber', 4),
+      ]),
+      ('gs://gcsio-test/cow/ca**', [
+          ('cow/cat/fish', 2),
+          ('cow/cat/blubber', 3),
+      ]),
+      ('gs://gcsio-test/apple/[df]ish/ca*', [
+          ('apple/fish/cat', 10),
+          ('apple/fish/cart', 11),
+          ('apple/fish/carl', 12),
+          ('apple/dish/cat', 14),
+          ('apple/dish/carl', 15),
+      ]),
+      ('gs://gcsio-test/apple/?ish/?a?', [
+          ('apple/fish/cat', 10),
+          ('apple/dish/bat', 13),
+          ('apple/dish/cat', 14),
+      ]),
+      ('gs://gcsio-test/apple/fish/car?', [
+          ('apple/fish/cart', 11),
+          ('apple/fish/carl', 12),
+      ]),
+      ('gs://gcsio-test/apple/fish/b*', [
+          ('apple/fish/blubber', 6),
+          ('apple/fish/blowfish', 7),
+          ('apple/fish/bambi', 8),
+          ('apple/fish/balloon', 9),
+      ]),
+      ('gs://gcsio-test/apple/f*/b*', [
+          ('apple/fish/blubber', 6),
+          ('apple/fish/blowfish', 7),
+          ('apple/fish/bambi', 8),
+          ('apple/fish/balloon', 9),
+      ]),
+      ('gs://gcsio-test/apple/dish/[cb]at', [
+          ('apple/dish/bat', 13),
+          ('apple/dish/cat', 14),
+      ]),
+      ('gs://gcsio-test/banana/cyrano.m?', [
+          ('banana/cyrano.md', 17),
+          ('banana/cyrano.mb', 18),
+      ]),
+  ])
+  def test_match_glob(self, file_pattern, expected_object_names):
     objects = [
         ('cow/cat/fish', 2),
         ('cow/cat/blubber', 3),
@@ -126,66 +183,69 @@
         ('apple/dish/bat', 13),
         ('apple/dish/cat', 14),
         ('apple/dish/carl', 15),
+        ('banana/cat', 16),
+        ('banana/cyrano.md', 17),
+        ('banana/cyrano.mb', 18),
+        ('cat.png', 19)
     ]
-    for (object_name, size) in objects:
+    bucket_name = 'gcsio-test'
+
+    if callable(expected_object_names):
+      # A hack around the fact that the parameters do not have access to
+      # the "objects" list.
+
+      if expected_object_names is all:
+        # It's a placeholder for "all" objects
+        expected_object_names = objects
+      else:
+        # It's a filter function of type (str, int) -> bool
+        # that returns true for expected objects
+        filter_func = expected_object_names
+        expected_object_names = [
+            (short_path, size) for short_path, size in objects
+            if filter_func(short_path, size)
+        ]
+
+    for object_name, size in objects:
       file_name = 'gs://%s/%s' % (bucket_name, object_name)
       self.fs._insert_random_file(file_name, size)
-    test_cases = [
-        ('gs://*', objects),
-        ('gs://gcsio-test/*', objects),
-        ('gs://gcsio-test/cow/*', [
-            ('cow/cat/fish', 2),
-            ('cow/cat/blubber', 3),
-            ('cow/dog/blubber', 4),
-        ]),
-        ('gs://gcsio-test/cow/ca*', [
-            ('cow/cat/fish', 2),
-            ('cow/cat/blubber', 3),
-        ]),
-        ('gs://gcsio-test/apple/[df]ish/ca*', [
-            ('apple/fish/cat', 10),
-            ('apple/fish/cart', 11),
-            ('apple/fish/carl', 12),
-            ('apple/dish/cat', 14),
-            ('apple/dish/carl', 15),
-        ]),
-        ('gs://gcsio-test/apple/fish/car?', [
-            ('apple/fish/cart', 11),
-            ('apple/fish/carl', 12),
-        ]),
-        ('gs://gcsio-test/apple/fish/b*', [
-            ('apple/fish/blubber', 6),
-            ('apple/fish/blowfish', 7),
-            ('apple/fish/bambi', 8),
-            ('apple/fish/balloon', 9),
-        ]),
-        ('gs://gcsio-test/apple/f*/b*', [
-            ('apple/fish/blubber', 6),
-            ('apple/fish/blowfish', 7),
-            ('apple/fish/bambi', 8),
-            ('apple/fish/balloon', 9),
-        ]),
-        ('gs://gcsio-test/apple/dish/[cb]at', [
-            ('apple/dish/bat', 13),
-            ('apple/dish/cat', 14),
-        ]),
+
+    expected_file_names = [('gs://%s/%s' % (bucket_name, object_name), size)
+                           for object_name, size in expected_object_names]
+    actual_file_names = [
+        (file_metadata.path, file_metadata.size_in_bytes)
+        for file_metadata in self._flatten_match(self.fs.match([file_pattern]))
     ]
-    for file_pattern, expected_object_names in test_cases:
-      expected_file_names = [('gs://%s/%s' % (bucket_name, object_name), size)
-                             for (object_name, size) in expected_object_names]
-      self.assertEqual(
-          set([(file_metadata.path, file_metadata.size_in_bytes)
-               for file_metadata in
-               self._flatten_match(self.fs.match([file_pattern]))]),
-          set(expected_file_names))
+
+    self.assertEqual(set(actual_file_names), set(expected_file_names))
 
     # Check if limits are followed correctly
     limit = 3
-    for file_pattern, expected_object_names in test_cases:
-      expected_num_items = min(len(expected_object_names), limit)
-      self.assertEqual(
-          len(self._flatten_match(self.fs.match([file_pattern], [limit]))),
-          expected_num_items)
+    expected_num_items = min(len(expected_object_names), limit)
+    self.assertEqual(
+        len(self._flatten_match(self.fs.match([file_pattern], [limit]))),
+        expected_num_items)
+
+  @parameterized.expand([
+      param(os_path=posixpath, sep_re='\\/'),
+      param(os_path=ntpath, sep_re='\\\\'),
+  ])
+  def test_translate_pattern(self, os_path, sep_re):
+    star = r'[^/\\]*'
+    double_star = r'.*'
+    join = os_path.join
+
+    sep = os_path.sep
+    pattern__expected = [
+        (join('a', '*'), sep_re.join(['a', star])),
+        (join('b', '*') + sep, sep_re.join(['b', star]) + sep_re),
+        (r'*[abc\]', star + r'[abc\\]'),
+        (join('d', '**', '*'), sep_re.join(['d', double_star, star])),
+    ]
+    for pattern, expected in pattern__expected:
+      expected += r'\Z(?ms)'
+      result = self.fs.translate_pattern(pattern)
+      self.assertEqual(result, expected)
 
 
 class TestFileSystemWithDirs(TestFileSystem):
diff --git a/sdks/python/apache_beam/io/filesystems.py b/sdks/python/apache_beam/io/filesystems.py
index 5d9c1fe..d8b3a4a 100644
--- a/sdks/python/apache_beam/io/filesystems.py
+++ b/sdks/python/apache_beam/io/filesystems.py
@@ -151,10 +151,26 @@
   def match(patterns, limits=None):
     """Find all matching paths to the patterns provided.
 
-    Pattern matching is done using fnmatch.fnmatch.
-    For filesystems that have directories, matching is not recursive. Patterns
-    like scheme://path/*/foo will not match anything.
-    Patterns ending with '/' will be appended with '*'.
+    Pattern matching is done using each filesystem's ``match`` method (e.g.
+    :meth:`.filesystem.FileSystem.match`).
+
+    .. note::
+      - Depending on the :class:`.FileSystem` implementation, file listings
+        (the ``.FileSystem._list`` method) may not be recursive.
+      - If the file listing is not recursive, a pattern like
+        ``scheme://path/*/foo`` will not be able to mach any files.
+
+    See Also:
+      :meth:`.filesystem.FileSystem.match`
+
+    Pattern syntax:
+      The pattern syntax is based on the fnmatch_ syntax, with the following
+      differences:
+
+      -   ``*`` Is equivalent to ``[^/\\]*`` rather than ``.*``.
+      -   ``**`` Is equivalent to ``.*``.
+
+    .. _`fnmatch`: https://docs.python.org/2/library/fnmatch.html
 
     Args:
       patterns: list of string for the file path pattern to match against
diff --git a/sdks/python/apache_beam/io/localfilesystem.py b/sdks/python/apache_beam/io/localfilesystem.py
index b634d2a..20748a2 100644
--- a/sdks/python/apache_beam/io/localfilesystem.py
+++ b/sdks/python/apache_beam/io/localfilesystem.py
@@ -111,9 +111,13 @@
     if not self.exists(dir_or_prefix):
       return
 
+    def list_files(root):
+      for dirpath, _, files in os.walk(root):
+        for filename in files:
+          yield self.join(dirpath, filename)
+
     try:
-      for f in os.listdir(dir_or_prefix):
-        f = self.join(dir_or_prefix, f)
+      for f in list_files(dir_or_prefix):
         try:
           yield FileMetadata(f, os.path.getsize(f))
         except OSError:
diff --git a/sdks/python/apache_beam/io/localfilesystem_test.py b/sdks/python/apache_beam/io/localfilesystem_test.py
index 5187dc0..1f813b4 100644
--- a/sdks/python/apache_beam/io/localfilesystem_test.py
+++ b/sdks/python/apache_beam/io/localfilesystem_test.py
@@ -28,6 +28,8 @@
 import unittest
 
 import mock
+from parameterized import param
+from parameterized import parameterized
 
 from apache_beam.io import localfilesystem
 from apache_beam.io.filesystem import BeamIOError
@@ -150,17 +152,40 @@
       self.fs.match([None])
     self.assertEqual(list(error.exception.exception_details.keys()), [None])
 
-  def test_match_glob(self):
-    path1 = os.path.join(self.tmpdir, 'f1')
-    path2 = os.path.join(self.tmpdir, 'f2')
-    open(path1, 'a').close()
-    open(path2, 'a').close()
+  @parameterized.expand([
+      param('*',
+            files=['a', 'b', 'c/x'],
+            expected=['a', 'b']),
+      param('**',
+            files=['a', 'b/x', 'c/x'],
+            expected=['a', 'b/x', 'c/x']),
+      param('*/*',
+            files=['a', 'b/x', 'c/x', 'd/x/y'],
+            expected=['b/x', 'c/x']),
+      param('**/*',
+            files=['a', 'b/x', 'c/x', 'd/x/y'],
+            expected=['b/x', 'c/x', 'd/x/y']),
+  ])
+  def test_match_glob(self, pattern, files, expected):
+    for filename in files:
+      full_path = os.path.join(self.tmpdir, filename)
+      dirname = os.path.dirname(full_path)
+      if not dirname == full_path:
+        # Make sure we don't go outside the tmpdir
+        assert os.path.commonprefix([self.tmpdir, full_path]) == self.tmpdir
+        try:
+          self.fs.mkdirs(dirname)
+        except IOError:
+          # Directory exists
+          pass
+
+      open(full_path, 'a').close()  # create empty file
 
     # Match both the files in the directory
-    path = os.path.join(self.tmpdir, '*')
-    result = self.fs.match([path])[0]
-    files = [f.path for f in result.metadata_list]
-    self.assertItemsEqual(files, [path1, path2])
+    full_pattern = os.path.join(self.tmpdir, pattern)
+    result = self.fs.match([full_pattern])[0]
+    files = [os.path.relpath(f.path, self.tmpdir) for f in result.metadata_list]
+    self.assertItemsEqual(files, expected)
 
   def test_match_directory(self):
     result = self.fs.match([self.tmpdir])[0]
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index aa93b04..692a9c8 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -252,6 +252,7 @@
         self.transforms = transforms
         self.downstream_side_inputs = downstream_side_inputs
         self.must_follow = must_follow
+        self.timer_pcollections = []
 
       def __repr__(self):
         must_follow = ', '.join(prev.name for prev in self.must_follow)
@@ -848,6 +849,88 @@
         stage.deduplicate_read()
       return final_stages
 
+    def inject_timer_pcollections(stages):
+      for stage in stages:
+        for transform in list(stage.transforms):
+          if transform.spec.urn == common_urns.primitives.PAR_DO.urn:
+            payload = proto_utils.parse_Bytes(
+                transform.spec.payload, beam_runner_api_pb2.ParDoPayload)
+            for tag in payload.timer_specs.keys():
+              if len(transform.inputs) > 1:
+                raise NotImplementedError('Timers and side inputs.')
+              input_pcoll = pipeline_components.pcollections[
+                  next(iter(transform.inputs.values()))]
+              # Create the appropriate coder for the timer PCollection.
+              void_coder_id = add_or_get_coder_id(
+                  beam.coders.SingletonCoder(None).to_runner_api(None))
+              timer_coder_id = add_or_get_coder_id(
+                  beam_runner_api_pb2.Coder(
+                      spec=beam_runner_api_pb2.SdkFunctionSpec(
+                          spec=beam_runner_api_pb2.FunctionSpec(
+                              urn=common_urns.coders.TIMER.urn)),
+                      component_coder_ids=[void_coder_id]))
+              key_coder_id = input_pcoll.coder_id
+              if (pipeline_components.coders[key_coder_id].spec.spec.urn
+                  == common_urns.coders.WINDOWED_VALUE.urn):
+                key_coder_id = pipeline_components.coders[
+                    key_coder_id].component_coder_ids[0]
+              if (pipeline_components.coders[key_coder_id].spec.spec.urn
+                  == common_urns.coders.KV.urn):
+                key_coder_id = pipeline_components.coders[
+                    key_coder_id].component_coder_ids[0]
+              key_timer_coder_id = add_or_get_coder_id(
+                  beam_runner_api_pb2.Coder(
+                      spec=beam_runner_api_pb2.SdkFunctionSpec(
+                          spec=beam_runner_api_pb2.FunctionSpec(
+                              urn=common_urns.coders.KV.urn)),
+                      component_coder_ids=[key_coder_id, timer_coder_id]))
+              timer_pcoll_coder_id = windowed_coder_id(
+                  key_timer_coder_id,
+                  pipeline_components.windowing_strategies[
+                      input_pcoll.windowing_strategy_id].window_coder_id)
+              # Inject the read and write pcollections.
+              timer_read_pcoll = unique_name(
+                  pipeline_components.pcollections,
+                  '%s_timers_to_read_%s' % (transform.unique_name, tag))
+              timer_write_pcoll = unique_name(
+                  pipeline_components.pcollections,
+                  '%s_timers_to_write_%s' % (transform.unique_name, tag))
+              pipeline_components.pcollections[timer_read_pcoll].CopyFrom(
+                  beam_runner_api_pb2.PCollection(
+                      unique_name=timer_read_pcoll,
+                      coder_id=timer_pcoll_coder_id,
+                      windowing_strategy_id=input_pcoll.windowing_strategy_id,
+                      is_bounded=input_pcoll.is_bounded))
+              pipeline_components.pcollections[timer_write_pcoll].CopyFrom(
+                  beam_runner_api_pb2.PCollection(
+                      unique_name=timer_write_pcoll,
+                      coder_id=timer_pcoll_coder_id,
+                      windowing_strategy_id=input_pcoll.windowing_strategy_id,
+                      is_bounded=input_pcoll.is_bounded))
+              stage.transforms.append(
+                  beam_runner_api_pb2.PTransform(
+                      unique_name=timer_read_pcoll + '/Read',
+                      outputs={'out': timer_read_pcoll},
+                      spec=beam_runner_api_pb2.FunctionSpec(
+                          urn=bundle_processor.DATA_INPUT_URN,
+                          payload=('timers:%s' % timer_read_pcoll).encode(
+                              'utf-8'))))
+              stage.transforms.append(
+                  beam_runner_api_pb2.PTransform(
+                      unique_name=timer_write_pcoll + '/Write',
+                      inputs={'in': timer_write_pcoll},
+                      spec=beam_runner_api_pb2.FunctionSpec(
+                          urn=bundle_processor.DATA_OUTPUT_URN,
+                          payload=('timers:%s' % timer_write_pcoll).encode(
+                              'utf-8'))))
+              assert tag not in transform.inputs
+              transform.inputs[tag] = timer_read_pcoll
+              assert tag not in transform.outputs
+              transform.outputs[tag] = timer_write_pcoll
+              stage.timer_pcollections.append(
+                  (timer_read_pcoll + '/Read', timer_write_pcoll))
+        yield stage
+
     def sort_stages(stages):
       """Order stages suitable for sequential execution.
       """
@@ -908,7 +991,7 @@
     for phase in [
         annotate_downstream_side_inputs, fix_side_input_pcoll_coders,
         lift_combiners, expand_gbk, sink_flattens, greedily_fuse,
-        impulse_to_input, sort_stages]:
+        impulse_to_input, inject_timer_pcollections, sort_stages]:
       logging.info('%s %s %s', '=' * 20, phase, '=' * 20)
       stages = list(phase(stages))
       logging.debug('Stages: %s', [str(s) for s in stages])
@@ -1016,7 +1099,8 @@
         controller.state_handler.blocking_append(state_key, elements_data)
 
     def get_buffer(pcoll_id):
-      if pcoll_id.startswith(b'materialize:'):
+      if (pcoll_id.startswith(b'materialize:')
+          or pcoll_id.startswith(b'timers:')):
         if pcoll_id not in pcoll_buffers:
           # Just store the data chunks for replay.
           pcoll_buffers[pcoll_id] = list()
@@ -1043,10 +1127,51 @@
         raise NotImplementedError(pcoll_id)
       return pcoll_buffers[pcoll_id]
 
-    return BundleManager(
+    result = BundleManager(
         controller, get_buffer, process_bundle_descriptor,
         self._progress_frequency).process_bundle(data_input, data_output)
 
+    while True:
+      timer_inputs = {}
+      for transform_id, timer_writes in stage.timer_pcollections:
+        windowed_timer_coder_impl = context.coders[
+            pipeline_components.pcollections[timer_writes].coder_id].get_impl()
+        written_timers = get_buffer(b'timers:' + timer_writes.encode('utf-8'))
+        if written_timers:
+          # Keep only the "last" timer set per key and window.
+          timers_by_key_and_window = {}
+          for elements_data in written_timers:
+            input_stream = create_InputStream(elements_data)
+            while input_stream.size() > 0:
+              windowed_key_timer = windowed_timer_coder_impl.decode_from_stream(
+                  input_stream, True)
+              key, _ = windowed_key_timer.value
+              # TODO: Explode and merge windows.
+              assert len(windowed_key_timer.windows) == 1
+              timers_by_key_and_window[
+                  key, windowed_key_timer.windows[0]] = windowed_key_timer
+          out = create_OutputStream()
+          for windowed_key_timer in timers_by_key_and_window.values():
+            windowed_timer_coder_impl.encode_to_stream(
+                windowed_key_timer, out, True)
+          timer_inputs[transform_id, 'out'] = [out.get()]
+          written_timers[:] = []
+      if timer_inputs:
+        # The worker will be waiting on these inputs as well.
+        for other_input in data_input:
+          if other_input not in timer_inputs:
+            timer_inputs[other_input] = []
+        # TODO(robertwb): merge results
+        BundleManager(
+            controller,
+            get_buffer,
+            process_bundle_descriptor,
+            self._progress_frequency).process_bundle(timer_inputs, data_output)
+      else:
+        break
+
+    return result
+
   # These classes are used to interact with the worker.
 
   class StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index a1a4c65..49d4783 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -264,6 +264,29 @@
       assert_that(p | beam.Create(inputs) | beam.ParDo(AddIndex()),
                   equal_to(expected))
 
+  def test_pardo_timers(self):
+    timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
+
+    class TimerDoFn(beam.DoFn):
+      def process(self, element, timer=beam.DoFn.TimerParam(timer_spec)):
+        unused_key, ts = element
+        timer.set(ts)
+        timer.set(2 * ts)
+
+      @userstate.on_timer(timer_spec)
+      def process_timer(self):
+        yield 'fired'
+
+    with self.create_pipeline() as p:
+      actual = (
+          p
+          | beam.Create([('k1', 10), ('k2', 100)])
+          | beam.ParDo(TimerDoFn())
+          | beam.Map(lambda x, ts=beam.DoFn.TimestampParam: (x, ts)))
+
+      expected = [('fired', ts) for ts in (20, 200)]
+      assert_that(actual, equal_to(expected))
+
   def test_group_by_key(self):
     with self.create_pipeline() as p:
       res = (p
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index 485038f..756e01a 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -205,7 +205,8 @@
     self._messages = []
 
   def cancel(self):
-    self._job_service.Cancel()
+    self._job_service.Cancel(beam_job_api_pb2.CancelJobRequest(
+        job_id=self._job_id))
 
   @property
   def state(self):
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 6b63357..7096820 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -50,6 +50,7 @@
 from apache_beam.transforms import userstate
 from apache_beam.utils import counters
 from apache_beam.utils import proto_utils
+from apache_beam.utils import timestamp
 
 # This module is experimental. No backwards-compatibility guarantees.
 
@@ -255,15 +256,39 @@
     self._state_handler.blocking_clear(self._state_key)
 
 
+class OutputTimer(object):
+  def __init__(self, key, receiver):
+    self._key = key
+    self._receiver = receiver
+
+  def set(self, ts):
+    from apache_beam.transforms.window import GlobalWindows
+    self._receiver.receive(
+        GlobalWindows.windowed_value(
+            (self._key,
+             dict(timestamp=timestamp.Timestamp.of(ts)))))
+
+  def clear(self, timestamp):
+    self._receiver.receive((self._key, dict(clear=True)))
+
+
 class FnApiUserStateContext(userstate.UserStateContext):
-  def __init__(self, state_handler, transform_id, key_coder, window_coder):
+  def __init__(
+      self, state_handler, transform_id, key_coder, window_coder, timer_specs):
     self._state_handler = state_handler
     self._transform_id = transform_id
     self._key_coder = key_coder
     self._window_coder = window_coder
+    self._timer_specs = timer_specs
+    self._timer_receivers = None
+
+  def update_timer_receivers(self, receivers):
+    self._timer_receivers = {}
+    for tag in self._timer_specs:
+      self._timer_receivers[tag] = receivers.pop(tag)
 
   def get_timer(self, timer_spec, key, window):
-    raise NotImplementedError
+    return OutputTimer(key, self._timer_receivers[timer_spec.name])
 
   def get_state(self, state_spec, key, window):
     if isinstance(state_spec,
@@ -360,7 +385,6 @@
             descriptor.transforms, key=topological_height, reverse=True)])
 
   def process_bundle(self, instruction_id):
-
     expected_inputs = []
     for op in self.ops.values():
       if isinstance(op, DataOutputOperation):
@@ -380,11 +404,19 @@
         op.start()
 
       # Inject inputs from data plane.
+      data_channels = collections.defaultdict(list)
+      input_op_by_target = {}
       for input_op in expected_inputs:
-        for data in input_op.data_channel.input_elements(
-            instruction_id, [input_op.target]):
-          # ignores input name
-          input_op.process_encoded(data.data)
+        data_channels[input_op.data_channel].append(input_op.target)
+        # ignores input name
+        input_op_by_target[
+            input_op.target.primitive_transform_reference] = input_op
+      for data_channel, expected_targets in data_channels.items():
+        for data in data_channel.input_elements(
+            instruction_id, expected_targets):
+          input_op_by_target[
+              data.target.primitive_transform_reference
+          ].process_encoded(data.data)
 
       # Finish all operations.
       for op in self.ops.values():
@@ -499,9 +531,31 @@
     return op
 
 
+class TimerConsumer(operations.Operation):
+  def __init__(self, timer_tag, do_op):
+    self._timer_tag = timer_tag
+    self._do_op = do_op
+
+  def process(self, windowed_value):
+    self._do_op.process_timer(self._timer_tag, windowed_value)
+
+
 @BeamTransformFactory.register_urn(
     DATA_INPUT_URN, beam_fn_api_pb2.RemoteGrpcPort)
 def create(factory, transform_id, transform_proto, grpc_port, consumers):
+  # Timers are the one special case where we don't want to call the
+  # (unlabeled) operation.process() method, which we detect here.
+  # TODO(robertwb): Consider generalizing if there are any more cases.
+  output_pcoll = only_element(transform_proto.outputs.values())
+  output_consumers = only_element(consumers.values())
+  if (len(output_consumers) == 1
+      and isinstance(only_element(output_consumers), operations.DoOperation)):
+    do_op = only_element(output_consumers)
+    for tag, pcoll_id in do_op.timer_inputs.items():
+      if pcoll_id == output_pcoll:
+        output_consumers[:] = [TimerConsumer(tag, do_op)]
+        break
+
   target = beam_fn_api_pb2.Target(
       primitive_transform_reference=transform_id,
       name=only_element(list(transform_proto.outputs.keys())))
@@ -597,18 +651,18 @@
   serialized_fn = parameter.do_fn.spec.payload
   return _create_pardo_operation(
       factory, transform_id, transform_proto, consumers,
-      serialized_fn, parameter.side_inputs)
+      serialized_fn, parameter)
 
 
 def _create_pardo_operation(
     factory, transform_id, transform_proto, consumers,
-    serialized_fn, side_inputs_proto=None):
+    serialized_fn, pardo_proto=None):
 
-  if side_inputs_proto:
+  if pardo_proto and pardo_proto.side_inputs:
     input_tags_to_coders = factory.get_input_coders(transform_proto)
     tagged_side_inputs = [
         (tag, beam.pvalue.SideInputData.from_runner_api(si, factory.context))
-        for tag, si in side_inputs_proto.items()]
+        for tag, si in pardo_proto.side_inputs.items()]
     tagged_side_inputs.sort(
         key=lambda tag_si: int(re.match('side([0-9]+)(-.*)?$',
                                         tag_si[0]).group(1)))
@@ -638,22 +692,40 @@
   dofn_data = pickler.loads(serialized_fn)
   if not dofn_data[-1]:
     # Windowing not set.
-    side_input_tags = side_inputs_proto or ()
+    if pardo_proto:
+      other_input_tags = set.union(
+          set(pardo_proto.side_inputs), set(pardo_proto.timer_specs))
+    else:
+      other_input_tags = ()
     pcoll_id, = [pcoll for tag, pcoll in transform_proto.inputs.items()
-                 if tag not in side_input_tags]
+                 if tag not in other_input_tags]
     windowing = factory.context.windowing_strategies.get_by_id(
         factory.descriptor.pcollections[pcoll_id].windowing_strategy_id)
     serialized_fn = pickler.dumps(dofn_data[:-1] + (windowing,))
 
-  if userstate.is_stateful_dofn(dofn_data[0]):
-    input_coder = factory.get_only_input_coder(transform_proto)
+  if pardo_proto and (pardo_proto.timer_specs or pardo_proto.state_specs):
+    main_input_coder = None
+    timer_inputs = {}
+    for tag, pcoll_id in transform_proto.inputs.items():
+      if tag in pardo_proto.timer_specs:
+        timer_inputs[tag] = pcoll_id
+      elif tag in pardo_proto.side_inputs:
+        pass
+      else:
+        # Must be the main input
+        assert main_input_coder is None
+        main_input_coder = factory.get_windowed_coder(pcoll_id)
+    assert main_input_coder is not None
+
     user_state_context = FnApiUserStateContext(
         factory.state_handler,
         transform_id,
-        input_coder.key_coder(),
-        input_coder.window_coder)
+        main_input_coder.key_coder(),
+        main_input_coder.window_coder,
+        timer_specs=pardo_proto.timer_specs)
   else:
     user_state_context = None
+    timer_inputs = None
 
   output_coders = factory.get_output_coders(transform_proto)
   spec = operation_specs.WorkerDoFn(
@@ -670,7 +742,8 @@
           factory.counter_factory,
           factory.state_sampler,
           side_input_maps,
-          user_state_context),
+          user_state_context,
+          timer_inputs=timer_inputs),
       transform_proto.unique_name,
       consumers,
       output_tags)
diff --git a/sdks/python/apache_beam/runners/worker/data_plane.py b/sdks/python/apache_beam/runners/worker/data_plane.py
index dcd9d3a..b737e5d 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane.py
@@ -139,7 +139,8 @@
   def input_elements(self, instruction_id, unused_expected_targets=None):
     for data in self._inputs:
       if data.instruction_reference == instruction_id:
-        yield data
+        if data.data:
+          yield data
 
   def output_stream(self, instruction_id, target):
     def add_to_inverse_output(data):
diff --git a/sdks/python/apache_beam/runners/worker/operations.pxd b/sdks/python/apache_beam/runners/worker/operations.pxd
index ad82aae..9cde9da 100644
--- a/sdks/python/apache_beam/runners/worker/operations.pxd
+++ b/sdks/python/apache_beam/runners/worker/operations.pxd
@@ -82,6 +82,8 @@
   cdef object tagged_receivers
   cdef object side_input_maps
   cdef object user_state_context
+  cdef public dict timer_inputs
+  cdef dict timer_specs
 
 
 cdef class CombineOperation(Operation):
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 41792cb..b16a2c9 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -42,6 +42,7 @@
 from apache_beam.transforms import sideinputs as apache_sideinputs
 from apache_beam.transforms import combiners
 from apache_beam.transforms import core
+from apache_beam.transforms import userstate
 from apache_beam.transforms.combiners import PhasedCombineFnExecutor
 from apache_beam.transforms.combiners import curry_combine_fn
 from apache_beam.transforms.window import GlobalWindows
@@ -296,11 +297,13 @@
 
   def __init__(
       self, name, spec, counter_factory, sampler, side_input_maps=None,
-      user_state_context=None):
+      user_state_context=None, timer_inputs=None):
     super(DoOperation, self).__init__(name, spec, counter_factory, sampler)
     self.side_input_maps = side_input_maps
     self.user_state_context = user_state_context
     self.tagged_receivers = None
+    # A mapping of timer tags to the input "PCollections" they come in on.
+    self.timer_inputs = timer_inputs or {}
 
   def _read_side_inputs(self, tags_and_types):
     """Generator reading side inputs in the order prescribed by tags_and_types.
@@ -389,6 +392,13 @@
           raise ValueError('Unexpected output name for operation: %s' % tag)
         self.tagged_receivers[original_tag] = self.receivers[index]
 
+      if self.user_state_context:
+        self.user_state_context.update_timer_receivers(self.tagged_receivers)
+        self.timer_specs = {
+            spec.name: spec
+            for spec in userstate.get_dofn_specs(fn)[1]
+        }
+
       if self.side_input_maps is None:
         if tags_and_types:
           self.side_input_maps = list(self._read_side_inputs(tags_and_types))
@@ -413,6 +423,12 @@
     with self.scoped_process_state:
       self.dofn_receiver.receive(o)
 
+  def process_timer(self, tag, windowed_timer):
+    key, timer_data = windowed_timer.value
+    timer_spec = self.timer_specs[tag]
+    self.dofn_receiver.process_user_timer(
+        timer_spec, key, windowed_timer.windows[0], timer_data['timestamp'])
+
   def finish(self):
     with self.scoped_finish_state:
       self.dofn_runner.finish()
diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
index 2abde69..907ce98 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
@@ -19,6 +19,7 @@
 
 from __future__ import absolute_import
 
+import sys
 import typing
 import unittest
 
@@ -36,7 +37,8 @@
 
 
 class NativeTypeCompatibilityTest(unittest.TestCase):
-
+  @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+                                             'fixed on Python 3')
   def test_convert_to_beam_type(self):
     test_cases = [
         ('raw bytes', bytes, bytes),
@@ -78,6 +80,8 @@
           native_type_compatibility.convert_to_beam_type(typing_type),
           beam_type, description)
 
+  @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+                                             'fixed on Python 3')
   def test_convert_to_beam_types(self):
     typing_types = [bytes, typing.List[bytes],
                     typing.List[typing.Tuple[bytes, int]],
diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py b/sdks/python/apache_beam/typehints/trivial_inference_test.py
index 9ce78fe..1c6ebd6 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference_test.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py
@@ -19,6 +19,7 @@
 
 from __future__ import absolute_import
 
+import sys
 import unittest
 
 from apache_beam.typehints import trivial_inference
@@ -106,6 +107,8 @@
         lambda xs: [x for x in xs],
         [typehints.Tuple[int, ...]])
 
+  @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+                                             'fixed on Python 3')
   def testTupleListComprehension(self):
     self.assertReturnType(
         typehints.List[int],
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index feee486..fa98075 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -20,6 +20,7 @@
 from __future__ import absolute_import
 
 import inspect
+import sys
 import typing
 import unittest
 
@@ -46,6 +47,8 @@
     with self.assertRaises(typehints.TypeCheckError):
       [1, 2, 3] | beam.Map(repeat, 3)
 
+  @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+                                             'fixed on Python 3')
   def test_non_function(self):
     result = ['a', 'bb', 'c'] | beam.Map(str.upper)
     self.assertEqual(['A', 'BB', 'C'], sorted(result))
@@ -102,6 +105,8 @@
       [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))
 
 
+@unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+                                           'fixed on Python 3')
 class NativeTypesTest(unittest.TestCase):
 
   def test_good_main_input(self):
@@ -196,6 +201,8 @@
   # with self.assertRaises(typehints.TypeCheckError):
   #   ['a', 'bb', 'c'] | beam.Map(repeat, 'z')
 
+  @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+                                             'fixed on Python 3')
   def test_deferred_side_inputs(self):
     @typehints.with_input_types(str, int)
     def repeat(s, times):
@@ -210,6 +217,8 @@
     with self.assertRaises(typehints.TypeCheckError):
       main_input | 'bis' >> beam.Map(repeat, pvalue.AsSingleton(bad_side_input))
 
+  @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+                                             'fixed on Python 3')
   def test_deferred_side_input_iterable(self):
     @typehints.with_input_types(str, typehints.Iterable[str])
     def concat(glue, items):
diff --git a/sdks/python/apache_beam/typehints/typehints_test.py b/sdks/python/apache_beam/typehints/typehints_test.py
index 53cdece..e1106ce 100644
--- a/sdks/python/apache_beam/typehints/typehints_test.py
+++ b/sdks/python/apache_beam/typehints/typehints_test.py
@@ -476,8 +476,8 @@
       typehints.KV[int, str, bool]
 
     self.assertEqual("Length of parameters to a KV type-hint must be "
-                     "exactly 2. Passed parameters: (<type 'int'>, <type "
-                     "'str'>, <type 'bool'>), have a length of 3.",
+                     "exactly 2. Passed parameters: ({}, {}, {}), have a "
+                     "length of 3.".format(int, str, bool),
                      e.exception.args[0])
 
   def test_getitem_proxy_to_tuple(self):
@@ -505,8 +505,8 @@
       typehints.Dict[float, int, bool]
 
     self.assertEqual("Length of parameters to a Dict type-hint must be "
-                     "exactly 2. Passed parameters: (<type 'float'>, <type "
-                     "'int'>, <type 'bool'>), have a length of 3.",
+                     "exactly 2. Passed parameters: ({}, {}, {}), have a "
+                     "length of 3.".format(float, int, bool),
                      e.exception.args[0])
 
   def test_key_type_must_be_valid_composite_param(self):
@@ -591,8 +591,8 @@
     with self.assertRaises(TypeError) as e:
       typehints.Set[list]
     self.assertEqual("Parameter to a Set hint must be a non-sequence, a "
-                     "type, or a TypeConstraint. <type 'list'> is an "
-                     "instance of type.",
+                     "type, or a TypeConstraint. {} is an instance of "
+                     "type.".format(list),
                      e.exception.args[0])
 
   def test_compatibility(self):
@@ -810,8 +810,8 @@
       m = 'a'
       foo(m)
     self.assertEqual("Type-hint for argument: 'a' violated. Expected an "
-                     "instance of <type 'int'>, instead found an "
-                     "instance of <type 'str'>.",
+                     "instance of {}, instead found an instance of "
+                     "{}.".format(int, type(m)),
                      e.exception.args[0])
 
   def test_composite_type_assertion(self):
@@ -861,11 +861,12 @@
       return a - b
 
     with self.assertRaises(TypeCheckError) as e:
-      sub(1, 'two')
+      m = 'two'
+      sub(1, m)
 
     self.assertEqual("Type-hint for argument: 'b' violated. Expected an "
-                     "instance of <type 'int'>, instead found an instance "
-                     "of <type 'str'>.",
+                     "instance of {}, instead found an instance of "
+                     "{}.".format(int, type(m)),
                      e.exception.args[0])
 
   def test_valid_only_positional_arguments(self):
@@ -907,11 +908,12 @@
     def foo(a):
       return 'test'
     with self.assertRaises(TypeCheckError) as e:
-      foo(4)
+      m = 4
+      foo(m)
 
     self.assertEqual("Type-hint for return type violated. Expected an "
-                     "instance of <type 'int'>, instead found an instance "
-                     "of <type 'str'>.",
+                     "instance of {}, instead found an instance of "
+                     "{}.".format(int, type('test')),
                      e.exception.args[0])
 
   def test_type_check_simple_type(self):
diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile
index cc560bf..7464592 100644
--- a/sdks/python/container/Dockerfile
+++ b/sdks/python/container/Dockerfile
@@ -29,67 +29,25 @@
        && \
     rm -rf /var/lib/apt/lists/*
 
-# Install packages required by the Python SDK.
-#
-# These packages should be kept in sync with the dependencies at
-# sdks/python/setup.py.  If not installed, sdk harness will install these at
-# runtime, but we would like to avoid doing this so that we do not depend on
-# PyPI at runtime whenever possible.
-#
-# Also install cython, numpy, pandas and scipy as well as their dependencies.
-# These are standard Python packages, likely to be used by python SDK customers,
-# and their dependencies.
-#
+# Install packages required by the Python SDK and common dependencies of the user code.
+
+# SDK dependencies not listed in base_image_requirements.txt will be installed when we install SDK
+# in the next RUN statement.
+
+COPY base_image_requirements.txt /tmp/base_image_requirements.txt
 RUN \
-    # These are packages needed by the Python SDK.
-    # TODO: This make more sense as a requirements.txt file (BEAM-5076)
-    pip install "avro == 1.8.2" && \
-    pip install "crcmod == 1.7" && \
-    pip install "dill == 0.2.6" && \
-    pip install "grpcio == 1.3.0" && \
-    pip install "hdfs == 2.1.0" && \
-    pip install "httplib2 == 0.9.2" && \
-    pip install "mock == 2.0.0" && \
-    pip install "oauth2client == 3.0.0" && \
-    pip install "protobuf == 3.3.0" && \
-    pip install "pytz == 2018.4" && \
-    pip install "pyyaml == 3.12" && \
-    pip install "pyvcf == 0.6.8" && \
-    pip install "typing == 3.6.1" && \
-    pip install "futures == 3.1.1" && \
-    pip install "future == 0.16.0" && \
-    # Setup packages
-    pip install "nose == 1.3.7" && \
-    # GCP extra features
-    pip install "google-apitools == 0.5.11" && \
-    pip install "proto-google-cloud-datastore-v1 == 0.90.4" && \
-    pip install "googledatastore == 7.0.1" && \
-    pip install "google-cloud-pubsub == 0.26.0" && \
-    pip install "proto-google-cloud-pubsub-v1 == 0.15.4" && \
-    pip install "google-cloud-bigquery == 0.25.0" && \
-    # Optional packages
-    pip install "cython == 0.28.1" && \
-    pip install "guppy == 0.1.10" && \
-    pip install "python-snappy == 0.5.3" && \
-    # These are additional packages likely to be used by customers.
-    pip install "numpy == 1.15" && \
-    pip install "pandas == 0.18.1" && \
-    pip install "scipy == 1.0.0" && \
-    pip install "protobuf == 3.3.0" && \
-    pip install "tensorflow == 1.9" && \
-    pip install "protorpc == 0.11.1" && \
-    pip install "python-gflags == 3.0.6" && \
+    pip install -r /tmp/base_image_requirements.txt && \
     # Check that the fast implementation of protobuf is used.
-    python -c "from google.protobuf.internal import api_implementation; assert api_implementation._default_implementation_type == 'cpp'; print 'Verified fast protobuf used.'" && \
+    python -c "from google.protobuf.internal import api_implementation; assert api_implementation._default_implementation_type == 'cpp'; print ('Verified fast protobuf used.')" && \
     # Remove pip cache.
     rm -rf /root/.cache/pip
 
 
-COPY target/apache-beam.tar.gz /opt/apache/beam/tars/
+COPY apache-beam.tar.gz /opt/apache/beam/tars/
 RUN pip install /opt/apache/beam/tars/apache-beam.tar.gz[gcp] && \
     # Remove pip cache.
     rm -rf /root/.cache/pip
 
-ADD target/linux_amd64/boot /opt/apache/beam/
+ADD linux_amd64/boot /opt/apache/beam/
 
 ENTRYPOINT ["/opt/apache/beam/boot"]
diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt
new file mode 100644
index 0000000..13a93b5
--- /dev/null
+++ b/sdks/python/container/base_image_requirements.txt
@@ -0,0 +1,64 @@
+###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###############################################################################
+
+# These are packages needed by the Apache Beam Python SDK. Their versions need
+# to be compatible with the requirements in sdks/python/setup.py.
+# Specifying the versions manually helps to resolve dependency conflicts
+# with other packages installed in the container.
+# Any SDK dependencies not listed here will be installed when SDK is installed
+# into the container.
+
+fastavro==0.21.4
+crcmod==1.7
+dill==0.2.8.2
+future==0.16.0
+futures==3.1.1
+grpcio==1.10.0
+hdfs==2.1.0
+httplib2==0.9.2
+mock==2.0.0
+oauth2client==3.0.0
+protobuf==3.6.1
+pytz==2018.4
+pyvcf==0.6.8
+pyyaml==3.12
+typing==3.6.1
+
+# Setup packages
+nose==1.3.7
+
+# GCP extra features
+google-apitools==0.5.20
+googledatastore==7.0.1
+google-cloud-pubsub==0.26.0
+google-cloud-bigquery==0.25.0
+proto-google-cloud-datastore-v1==0.90.4
+proto-google-cloud-pubsub-v1==0.15.4
+
+# Optional packages
+cython==0.28.1
+python-snappy==0.5.3
+
+# These are additional packages likely to be used by customers.
+numpy==1.14.5
+scipy==1.1.0
+pandas==0.22.0
+protorpc==0.11.1
+python-gflags==3.0.6
+setuptools<=39.1.0 # requirement for Tensorflow.
+tensorflow==1.10.0
diff --git a/sdks/python/container/build.gradle b/sdks/python/container/build.gradle
index f6f1c30..8f4256d 100644
--- a/sdks/python/container/build.gradle
+++ b/sdks/python/container/build.gradle
@@ -28,7 +28,8 @@
 resolveBuildDependencies.dependsOn ":beam-sdks-go:build"
 
 configurations {
-  dockerDependency
+  sdkSourceTarball
+  sdkHarnessLauncher
 }
 
 dependencies {
@@ -38,14 +39,15 @@
     build name: './github.com/apache/beam/sdks/go', dir: project(':beam-sdks-go').projectDir
     test name: './github.com/apache/beam/sdks/go', dir: project(':beam-sdks-go').projectDir
   }
-
-  dockerDependency project(path: ":beam-sdks-python", configuration: "distConfig")
+  sdkSourceTarball project(path: ":beam-sdks-python", configuration: "distConfig")
 }
 
-task copyDockerfileDependencies(type: Copy) {
-  from configurations.dockerDependency
+task copyDockerfileDependencies(type: Copy, dependsOn: build) {
+  from configurations.sdkSourceTarball
+  from file("build/launcher")
+  from file("./base_image_requirements.txt")
   into "build/target"
-  configurations.dockerDependency.stopExecutionIfEmpty()
+  configurations.sdkSourceTarball.stopExecutionIfEmpty()
 }
 
 golang {
@@ -53,13 +55,17 @@
   build {
     // TODO(herohde): build local platform + linux-amd64, if possible.
     targetPlatform = ['linux-amd64']
-    outputLocation = './build/target/${GOOS}_${GOARCH}/boot'
+    outputLocation = './build/launcher/${GOOS}_${GOARCH}/boot'
   }
 }
 
 docker {
   name containerImageName(name: "python")
-  files "./build/"
+  files "./build/target"
+}
+
+artifacts {
+  sdkHarnessLauncher file: file('./build/launcher'), builtBy: build
 }
 
 // Ensure that making the docker image builds any required artifacts
diff --git a/sdks/python/container/py3/Dockerfile b/sdks/python/container/py3/Dockerfile
new file mode 100644
index 0000000..66e17e0
--- /dev/null
+++ b/sdks/python/container/py3/Dockerfile
@@ -0,0 +1,54 @@
+###############################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+###############################################################################
+
+FROM python:3.5-stretch
+MAINTAINER "Apache Beam <dev@beam.apache.org>"
+
+# Install native bindings required for dependencies.
+RUN apt-get update && \
+    apt-get install -y \
+       # These packages are needed for "pip install python-snappy" below.
+       libsnappy-dev \
+       # This package is needed for "pip install pyyaml" below to have c bindings.
+       libyaml-dev \
+       && \
+    rm -rf /var/lib/apt/lists/*
+
+# Install packages required by the Python SDK and common dependencies of the user code.
+
+# SDK dependencies not listed in base_image_requirements.txt will be installed when we install SDK
+# in the next RUN statement.
+
+COPY base_image_requirements.txt /tmp/base_image_requirements.txt
+RUN \
+    pip install -r /tmp/base_image_requirements.txt && \
+    # Check that the fast implementation of protobuf is used.
+    python -c "from google.protobuf.internal import api_implementation; assert api_implementation._default_implementation_type == 'cpp'; print ('Verified fast protobuf used.')" && \
+    # Remove pip cache.
+    rm -rf /root/.cache/pip
+
+
+ENV BEAM_EXPERIMENTAL_PY3=1
+COPY apache-beam.tar.gz /opt/apache/beam/tars/
+RUN pip install /opt/apache/beam/tars/apache-beam.tar.gz[gcp] && \
+    # Remove pip cache.
+    rm -rf /root/.cache/pip
+
+ADD linux_amd64/boot /opt/apache/beam/
+
+ENTRYPOINT ["/opt/apache/beam/boot"]
diff --git a/sdks/python/container/py3/build.gradle b/sdks/python/container/py3/build.gradle
new file mode 100644
index 0000000..5fef2be
--- /dev/null
+++ b/sdks/python/container/py3/build.gradle
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+apply plugin: org.apache.beam.gradle.BeamModulePlugin
+applyDockerNature()
+
+description = "Apache Beam :: SDKs :: Python :: Container :: Python 3 Container"
+
+configurations {
+  sdkSourceTarball
+  sdkHarnessLauncher
+}
+
+dependencies {
+  sdkSourceTarball project(path: ":beam-sdks-python", configuration: "distConfig")
+  sdkHarnessLauncher project(path: ":beam-sdks-python-container", configuration: "sdkHarnessLauncher")
+}
+
+task copyDockerfileDependencies(type: Copy) {
+  from configurations.sdkSourceTarball
+  from configurations.sdkHarnessLauncher
+  from file("../base_image_requirements.txt")
+  into "build/target"
+  configurations.sdkSourceTarball.stopExecutionIfEmpty()
+  configurations.sdkHarnessLauncher.stopExecutionIfEmpty()
+}
+
+docker {
+  name containerImageName(name: "python3")
+  files "./build/target"
+}
+
+dockerPrepare.dependsOn copyDockerfileDependencies
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 22e1ade..254135b 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -131,6 +131,7 @@
 
 REQUIRED_TEST_PACKAGES = [
     'nose>=1.3.7',
+    'parameterized>=0.6.0,<0.7.0',
     'numpy>=1.14.3,<2',
     'pyhamcrest>=1.9,<2.0',
     ]
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 46d2373..bf9175f 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -57,7 +57,7 @@
 setenv =
   BEAM_EXPERIMENTAL_PY3=1
 modules =
-  apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners
+  apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners
 commands =
   python --version
   pip --version
diff --git a/settings.gradle b/settings.gradle
index 3600e41..80d6250 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -170,6 +170,8 @@
 project(":beam-sdks-python").dir = file("sdks/python")
 include "beam-sdks-python-container"
 project(":beam-sdks-python-container").dir = file("sdks/python/container")
+include "beam-sdks-python-container-py3"
+project(":beam-sdks-python-container-py3").dir = file("sdks/python/container/py3")
 include "beam-vendor-sdks-java-extensions-protobuf"
 project(":beam-vendor-sdks-java-extensions-protobuf").dir = file("vendor/sdks-java-extensions-protobuf")
 include "beam-website"
diff --git a/website/Rakefile b/website/Rakefile
index 5160bad..e814956 100644
--- a/website/Rakefile
+++ b/website/Rakefile
@@ -18,7 +18,8 @@
         /jstorm.io/,
         /datatorrent.com/,
         /ai.google/, # https://issues.apache.org/jira/browse/INFRA-16527
-        /globenewswire.com/ # https://issues.apache.org/jira/browse/BEAM-5518
+        /globenewswire.com/, # https://issues.apache.org/jira/browse/BEAM-5518
+        /www.se-radio.net/ # BEAM-5611: Can fail with rate limit HTTP 508 error
     ],
     :parallel => { :in_processes => Etc.nprocessors },
     }).run
diff --git a/website/build.gradle b/website/build.gradle
index e59160a..eb52de7 100644
--- a/website/build.gradle
+++ b/website/build.gradle
@@ -144,9 +144,8 @@
   // get the latest commit on master
   def latestCommit = grgit.log(maxCommits: 1)[0].abbreviatedId
 
-  shell "git fetch ${gitboxUrl} asf-site"
+  shell "git fetch --force origin +asf-site:asf-site"
   git.checkout(branch: 'asf-site')
-  shell "git reset --hard origin/asf-site"
 
   // Delete the previous content.
   git.remove(patterns: [ 'website/generated-content' ])