Merge pull request #15117 from ajamato/bq_java_read_metrics

[BEAM-11994] Update BigQueryStorageStreamSource and BigQueryServicesImpl to capture API_REQUEST_COUNT metrics/errors for storage API reads
diff --git a/.test-infra/jenkins/job_PostCommit_Python.groovy b/.test-infra/jenkins/job_PostCommit_Python.groovy
index f0e4e58..2f3b1d8 100644
--- a/.test-infra/jenkins/job_PostCommit_Python.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python.groovy
@@ -33,7 +33,7 @@
         commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 120)
 
         publishers {
-          archiveJunit('**/nosetests*.xml')
+          archiveJunit('**/pytest*.xml')
         }
 
         // Execute shell command to test Python SDK.
diff --git a/LICENSE b/LICENSE
index 44c35bb..3b335e3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -403,261 +403,5 @@
     OF CONTRACT, TORT OR OTHERWISE,  ARISING FROM, OUT OF OR IN CONNECTION
     WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE
 
-################################################################################
-CPython LICENSE. Source:
-https://github.com/python/cpython/blob/81574b80e92554adf75c13fa42415beb8be383cb/LICENSE
-
-A. HISTORY OF THE SOFTWARE
-==========================
-
-Python was created in the early 1990s by Guido van Rossum at Stichting
-Mathematisch Centrum (CWI, see http://www.cwi.nl) in the Netherlands
-as a successor of a language called ABC.  Guido remains Python's
-principal author, although it includes many contributions from others.
-
-In 1995, Guido continued his work on Python at the Corporation for
-National Research Initiatives (CNRI, see http://www.cnri.reston.va.us)
-in Reston, Virginia where he released several versions of the
-software.
-
-In May 2000, Guido and the Python core development team moved to
-BeOpen.com to form the BeOpen PythonLabs team.  In October of the same
-year, the PythonLabs team moved to Digital Creations, which became
-Zope Corporation.  In 2001, the Python Software Foundation (PSF, see
-https://www.python.org/psf/) was formed, a non-profit organization
-created specifically to own Python-related Intellectual Property.
-Zope Corporation was a sponsoring member of the PSF.
-
-All Python releases are Open Source (see http://www.opensource.org for
-the Open Source Definition).  Historically, most, but not all, Python
-releases have also been GPL-compatible; the table below summarizes
-the various releases.
-
-    Release         Derived     Year        Owner       GPL-
-                    from                                compatible? (1)
-
-    0.9.0 thru 1.2              1991-1995   CWI         yes
-    1.3 thru 1.5.2  1.2         1995-1999   CNRI        yes
-    1.6             1.5.2       2000        CNRI        no
-    2.0             1.6         2000        BeOpen.com  no
-    1.6.1           1.6         2001        CNRI        yes (2)
-    2.1             2.0+1.6.1   2001        PSF         no
-    2.0.1           2.0+1.6.1   2001        PSF         yes
-    2.1.1           2.1+2.0.1   2001        PSF         yes
-    2.1.2           2.1.1       2002        PSF         yes
-    2.1.3           2.1.2       2002        PSF         yes
-    2.2 and above   2.1.1       2001-now    PSF         yes
-
-Footnotes:
-
-(1) GPL-compatible doesn't mean that we're distributing Python under
-    the GPL.  All Python licenses, unlike the GPL, let you distribute
-    a modified version without making your changes open source.  The
-    GPL-compatible licenses make it possible to combine Python with
-    other software that is released under the GPL; the others don't.
-
-(2) According to Richard Stallman, 1.6.1 is not GPL-compatible,
-    because its license has a choice of law clause.  According to
-    CNRI, however, Stallman's lawyer has told CNRI's lawyer that 1.6.1
-    is "not incompatible" with the GPL.
-
-Thanks to the many outside volunteers who have worked under Guido's
-direction to make these releases possible.
-
-
-B. TERMS AND CONDITIONS FOR ACCESSING OR OTHERWISE USING PYTHON
-===============================================================
-
-PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
---------------------------------------------
-
-1. This LICENSE AGREEMENT is between the Python Software Foundation
-("PSF"), and the Individual or Organization ("Licensee") accessing and
-otherwise using this software ("Python") in source or binary form and
-its associated documentation.
-
-2. Subject to the terms and conditions of this License Agreement, PSF hereby
-grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
-analyze, test, perform and/or display publicly, prepare derivative works,
-distribute, and otherwise use Python alone or in any derivative version,
-provided, however, that PSF's License Agreement and PSF's notice of copyright,
-i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
-2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018 Python Software Foundation; All
-Rights Reserved" are retained in Python alone or in any derivative version
-prepared by Licensee.
-
-3. In the event Licensee prepares a derivative work that is based on
-or incorporates Python or any part thereof, and wants to make
-the derivative work available to others as provided herein, then
-Licensee hereby agrees to include in any such work a brief summary of
-the changes made to Python.
-
-4. PSF is making Python available to Licensee on an "AS IS"
-basis.  PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
-IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
-DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
-FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
-INFRINGE ANY THIRD PARTY RIGHTS.
-
-5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
-FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
-A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
-OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
-
-6. This License Agreement will automatically terminate upon a material
-breach of its terms and conditions.
-
-7. Nothing in this License Agreement shall be deemed to create any
-relationship of agency, partnership, or joint venture between PSF and
-Licensee.  This License Agreement does not grant permission to use PSF
-trademarks or trade name in a trademark sense to endorse or promote
-products or services of Licensee, or any third party.
-
-8. By copying, installing or otherwise using Python, Licensee
-agrees to be bound by the terms and conditions of this License
-Agreement.
-
-
-BEOPEN.COM LICENSE AGREEMENT FOR PYTHON 2.0
--------------------------------------------
-
-BEOPEN PYTHON OPEN SOURCE LICENSE AGREEMENT VERSION 1
-
-1. This LICENSE AGREEMENT is between BeOpen.com ("BeOpen"), having an
-office at 160 Saratoga Avenue, Santa Clara, CA 95051, and the
-Individual or Organization ("Licensee") accessing and otherwise using
-this software in source or binary form and its associated
-documentation ("the Software").
-
-2. Subject to the terms and conditions of this BeOpen Python License
-Agreement, BeOpen hereby grants Licensee a non-exclusive,
-royalty-free, world-wide license to reproduce, analyze, test, perform
-and/or display publicly, prepare derivative works, distribute, and
-otherwise use the Software alone or in any derivative version,
-provided, however, that the BeOpen Python License is retained in the
-Software, alone or in any derivative version prepared by Licensee.
-
-3. BeOpen is making the Software available to Licensee on an "AS IS"
-basis.  BEOPEN MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
-IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, BEOPEN MAKES NO AND
-DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
-FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF THE SOFTWARE WILL NOT
-INFRINGE ANY THIRD PARTY RIGHTS.
-
-4. BEOPEN SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF THE
-SOFTWARE FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS
-AS A RESULT OF USING, MODIFYING OR DISTRIBUTING THE SOFTWARE, OR ANY
-DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
-
-5. This License Agreement will automatically terminate upon a material
-breach of its terms and conditions.
-
-6. This License Agreement shall be governed by and interpreted in all
-respects by the law of the State of California, excluding conflict of
-law provisions.  Nothing in this License Agreement shall be deemed to
-create any relationship of agency, partnership, or joint venture
-between BeOpen and Licensee.  This License Agreement does not grant
-permission to use BeOpen trademarks or trade names in a trademark
-sense to endorse or promote products or services of Licensee, or any
-third party.  As an exception, the "BeOpen Python" logos available at
-http://www.pythonlabs.com/logos.html may be used according to the
-permissions granted on that web page.
-
-7. By copying, installing or otherwise using the software, Licensee
-agrees to be bound by the terms and conditions of this License
-Agreement.
-
-
-CNRI LICENSE AGREEMENT FOR PYTHON 1.6.1
----------------------------------------
-
-1. This LICENSE AGREEMENT is between the Corporation for National
-Research Initiatives, having an office at 1895 Preston White Drive,
-Reston, VA 20191 ("CNRI"), and the Individual or Organization
-("Licensee") accessing and otherwise using Python 1.6.1 software in
-source or binary form and its associated documentation.
-
-2. Subject to the terms and conditions of this License Agreement, CNRI
-hereby grants Licensee a nonexclusive, royalty-free, world-wide
-license to reproduce, analyze, test, perform and/or display publicly,
-prepare derivative works, distribute, and otherwise use Python 1.6.1
-alone or in any derivative version, provided, however, that CNRI's
-License Agreement and CNRI's notice of copyright, i.e., "Copyright (c)
-1995-2001 Corporation for National Research Initiatives; All Rights
-Reserved" are retained in Python 1.6.1 alone or in any derivative
-version prepared by Licensee.  Alternately, in lieu of CNRI's License
-Agreement, Licensee may substitute the following text (omitting the
-quotes): "Python 1.6.1 is made available subject to the terms and
-conditions in CNRI's License Agreement.  This Agreement together with
-Python 1.6.1 may be located on the Internet using the following
-unique, persistent identifier (known as a handle): 1895.22/1013.  This
-Agreement may also be obtained from a proxy server on the Internet
-using the following URL: http://hdl.handle.net/1895.22/1013".
-
-3. In the event Licensee prepares a derivative work that is based on
-or incorporates Python 1.6.1 or any part thereof, and wants to make
-the derivative work available to others as provided herein, then
-Licensee hereby agrees to include in any such work a brief summary of
-the changes made to Python 1.6.1.
-
-4. CNRI is making Python 1.6.1 available to Licensee on an "AS IS"
-basis.  CNRI MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
-IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, CNRI MAKES NO AND
-DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
-FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON 1.6.1 WILL NOT
-INFRINGE ANY THIRD PARTY RIGHTS.
-
-5. CNRI SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
-1.6.1 FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
-A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 1.6.1,
-OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
-
-6. This License Agreement will automatically terminate upon a material
-breach of its terms and conditions.
-
-7. This License Agreement shall be governed by the federal
-intellectual property law of the United States, including without
-limitation the federal copyright law, and, to the extent such
-U.S. federal law does not apply, by the law of the Commonwealth of
-Virginia, excluding Virginia's conflict of law provisions.
-Notwithstanding the foregoing, with regard to derivative works based
-on Python 1.6.1 that incorporate non-separable material that was
-previously distributed under the GNU General Public License (GPL), the
-law of the Commonwealth of Virginia shall govern this License
-Agreement only as to issues arising under or with respect to
-Paragraphs 4, 5, and 7 of this License Agreement.  Nothing in this
-License Agreement shall be deemed to create any relationship of
-agency, partnership, or joint venture between CNRI and Licensee.  This
-License Agreement does not grant permission to use CNRI trademarks or
-trade name in a trademark sense to endorse or promote products or
-services of Licensee, or any third party.
-
-8. By clicking on the "ACCEPT" button where indicated, or by copying,
-installing or otherwise using Python 1.6.1, Licensee agrees to be
-bound by the terms and conditions of this License Agreement.
-
-        ACCEPT
-
-
-CWI LICENSE AGREEMENT FOR PYTHON 0.9.0 THROUGH 1.2
---------------------------------------------------
-
-Copyright (c) 1991 - 1995, Stichting Mathematisch Centrum Amsterdam,
-The Netherlands.  All rights reserved.
-
-Permission to use, copy, modify, and distribute this software and its
-documentation for any purpose and without fee is hereby granted,
-provided that the above copyright notice appear in all copies and that
-both that copyright notice and this permission notice appear in
-supporting documentation, and that the name of Stichting Mathematisch
-Centrum or CWI not be used in advertising or publicity pertaining to
-distribution of the software without specific, written prior
-permission.
-
-STICHTING MATHEMATISCH CENTRUM DISCLAIMS ALL WARRANTIES WITH REGARD TO
-THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
-FITNESS, IN NO EVENT SHALL STICHTING MATHEMATISCH CENTRUM BE LIABLE
-FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
-WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
-ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
-OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+See the adjacent LICENSE.python file, if present, for additional licenses that
+apply to parts of Apache Beam Python.
\ No newline at end of file
diff --git a/LICENSE.python b/LICENSE.python
new file mode 100644
index 0000000..f2f028b
--- /dev/null
+++ b/LICENSE.python
@@ -0,0 +1,258 @@
+
+################################################################################
+CPython LICENSE. Source:
+https://github.com/python/cpython/blob/81574b80e92554adf75c13fa42415beb8be383cb/LICENSE
+
+A. HISTORY OF THE SOFTWARE
+==========================
+
+Python was created in the early 1990s by Guido van Rossum at Stichting
+Mathematisch Centrum (CWI, see http://www.cwi.nl) in the Netherlands
+as a successor of a language called ABC.  Guido remains Python's
+principal author, although it includes many contributions from others.
+
+In 1995, Guido continued his work on Python at the Corporation for
+National Research Initiatives (CNRI, see http://www.cnri.reston.va.us)
+in Reston, Virginia where he released several versions of the
+software.
+
+In May 2000, Guido and the Python core development team moved to
+BeOpen.com to form the BeOpen PythonLabs team.  In October of the same
+year, the PythonLabs team moved to Digital Creations, which became
+Zope Corporation.  In 2001, the Python Software Foundation (PSF, see
+https://www.python.org/psf/) was formed, a non-profit organization
+created specifically to own Python-related Intellectual Property.
+Zope Corporation was a sponsoring member of the PSF.
+
+All Python releases are Open Source (see http://www.opensource.org for
+the Open Source Definition).  Historically, most, but not all, Python
+releases have also been GPL-compatible; the table below summarizes
+the various releases.
+
+    Release         Derived     Year        Owner       GPL-
+                    from                                compatible? (1)
+
+    0.9.0 thru 1.2              1991-1995   CWI         yes
+    1.3 thru 1.5.2  1.2         1995-1999   CNRI        yes
+    1.6             1.5.2       2000        CNRI        no
+    2.0             1.6         2000        BeOpen.com  no
+    1.6.1           1.6         2001        CNRI        yes (2)
+    2.1             2.0+1.6.1   2001        PSF         no
+    2.0.1           2.0+1.6.1   2001        PSF         yes
+    2.1.1           2.1+2.0.1   2001        PSF         yes
+    2.1.2           2.1.1       2002        PSF         yes
+    2.1.3           2.1.2       2002        PSF         yes
+    2.2 and above   2.1.1       2001-now    PSF         yes
+
+Footnotes:
+
+(1) GPL-compatible doesn't mean that we're distributing Python under
+    the GPL.  All Python licenses, unlike the GPL, let you distribute
+    a modified version without making your changes open source.  The
+    GPL-compatible licenses make it possible to combine Python with
+    other software that is released under the GPL; the others don't.
+
+(2) According to Richard Stallman, 1.6.1 is not GPL-compatible,
+    because its license has a choice of law clause.  According to
+    CNRI, however, Stallman's lawyer has told CNRI's lawyer that 1.6.1
+    is "not incompatible" with the GPL.
+
+Thanks to the many outside volunteers who have worked under Guido's
+direction to make these releases possible.
+
+B. TERMS AND CONDITIONS FOR ACCESSING OR OTHERWISE USING PYTHON
+===============================================================
+
+PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
+--------------------------------------------
+
+1. This LICENSE AGREEMENT is between the Python Software Foundation
+("PSF"), and the Individual or Organization ("Licensee") accessing and
+otherwise using this software ("Python") in source or binary form and
+its associated documentation.
+
+2. Subject to the terms and conditions of this License Agreement, PSF hereby
+grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
+analyze, test, perform and/or display publicly, prepare derivative works,
+distribute, and otherwise use Python alone or in any derivative version,
+provided, however, that PSF's License Agreement and PSF's notice of copyright,
+i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018 Python Software Foundation; All
+Rights Reserved" are retained in Python alone or in any derivative version
+prepared by Licensee.
+
+3. In the event Licensee prepares a derivative work that is based on
+or incorporates Python or any part thereof, and wants to make
+the derivative work available to others as provided herein, then
+Licensee hereby agrees to include in any such work a brief summary of
+the changes made to Python.
+
+4. PSF is making Python available to Licensee on an "AS IS"
+basis.  PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
+FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
+A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
+OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+6. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+7. Nothing in this License Agreement shall be deemed to create any
+relationship of agency, partnership, or joint venture between PSF and
+Licensee.  This License Agreement does not grant permission to use PSF
+trademarks or trade name in a trademark sense to endorse or promote
+products or services of Licensee, or any third party.
+
+8. By copying, installing or otherwise using Python, Licensee
+agrees to be bound by the terms and conditions of this License
+Agreement.
+
+BEOPEN.COM LICENSE AGREEMENT FOR PYTHON 2.0
+-------------------------------------------
+
+BEOPEN PYTHON OPEN SOURCE LICENSE AGREEMENT VERSION 1
+
+1. This LICENSE AGREEMENT is between BeOpen.com ("BeOpen"), having an
+office at 160 Saratoga Avenue, Santa Clara, CA 95051, and the
+Individual or Organization ("Licensee") accessing and otherwise using
+this software in source or binary form and its associated
+documentation ("the Software").
+
+2. Subject to the terms and conditions of this BeOpen Python License
+Agreement, BeOpen hereby grants Licensee a non-exclusive,
+royalty-free, world-wide license to reproduce, analyze, test, perform
+and/or display publicly, prepare derivative works, distribute, and
+otherwise use the Software alone or in any derivative version,
+provided, however, that the BeOpen Python License is retained in the
+Software, alone or in any derivative version prepared by Licensee.
+
+3. BeOpen is making the Software available to Licensee on an "AS IS"
+basis.  BEOPEN MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, BEOPEN MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF THE SOFTWARE WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+4. BEOPEN SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF THE
+SOFTWARE FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS
+AS A RESULT OF USING, MODIFYING OR DISTRIBUTING THE SOFTWARE, OR ANY
+DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+5. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+6. This License Agreement shall be governed by and interpreted in all
+respects by the law of the State of California, excluding conflict of
+law provisions.  Nothing in this License Agreement shall be deemed to
+create any relationship of agency, partnership, or joint venture
+between BeOpen and Licensee.  This License Agreement does not grant
+permission to use BeOpen trademarks or trade names in a trademark
+sense to endorse or promote products or services of Licensee, or any
+third party.  As an exception, the "BeOpen Python" logos available at
+http://www.pythonlabs.com/logos.html may be used according to the
+permissions granted on that web page.
+
+7. By copying, installing or otherwise using the software, Licensee
+agrees to be bound by the terms and conditions of this License
+Agreement.
+
+
+CNRI LICENSE AGREEMENT FOR PYTHON 1.6.1
+---------------------------------------
+
+1. This LICENSE AGREEMENT is between the Corporation for National
+Research Initiatives, having an office at 1895 Preston White Drive,
+Reston, VA 20191 ("CNRI"), and the Individual or Organization
+("Licensee") accessing and otherwise using Python 1.6.1 software in
+source or binary form and its associated documentation.
+
+2. Subject to the terms and conditions of this License Agreement, CNRI
+hereby grants Licensee a nonexclusive, royalty-free, world-wide
+license to reproduce, analyze, test, perform and/or display publicly,
+prepare derivative works, distribute, and otherwise use Python 1.6.1
+alone or in any derivative version, provided, however, that CNRI's
+License Agreement and CNRI's notice of copyright, i.e., "Copyright (c)
+1995-2001 Corporation for National Research Initiatives; All Rights
+Reserved" are retained in Python 1.6.1 alone or in any derivative
+version prepared by Licensee.  Alternately, in lieu of CNRI's License
+Agreement, Licensee may substitute the following text (omitting the
+quotes): "Python 1.6.1 is made available subject to the terms and
+conditions in CNRI's License Agreement.  This Agreement together with
+Python 1.6.1 may be located on the Internet using the following
+unique, persistent identifier (known as a handle): 1895.22/1013.  This
+Agreement may also be obtained from a proxy server on the Internet
+using the following URL: http://hdl.handle.net/1895.22/1013".
+
+3. In the event Licensee prepares a derivative work that is based on
+or incorporates Python 1.6.1 or any part thereof, and wants to make
+the derivative work available to others as provided herein, then
+Licensee hereby agrees to include in any such work a brief summary of
+the changes made to Python 1.6.1.
+
+4. CNRI is making Python 1.6.1 available to Licensee on an "AS IS"
+basis.  CNRI MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED.  BY WAY OF EXAMPLE, BUT NOT LIMITATION, CNRI MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON 1.6.1 WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+5. CNRI SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
+1.6.1 FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
+A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 1.6.1,
+OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+6. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+7. This License Agreement shall be governed by the federal
+intellectual property law of the United States, including without
+limitation the federal copyright law, and, to the extent such
+U.S. federal law does not apply, by the law of the Commonwealth of
+Virginia, excluding Virginia's conflict of law provisions.
+Notwithstanding the foregoing, with regard to derivative works based
+on Python 1.6.1 that incorporate non-separable material that was
+previously distributed under the GNU General Public License (GPL), the
+law of the Commonwealth of Virginia shall govern this License
+Agreement only as to issues arising under or with respect to
+Paragraphs 4, 5, and 7 of this License Agreement.  Nothing in this
+License Agreement shall be deemed to create any relationship of
+agency, partnership, or joint venture between CNRI and Licensee.  This
+License Agreement does not grant permission to use CNRI trademarks or
+trade name in a trademark sense to endorse or promote products or
+services of Licensee, or any third party.
+
+8. By clicking on the "ACCEPT" button where indicated, or by copying,
+installing or otherwise using Python 1.6.1, Licensee agrees to be
+bound by the terms and conditions of this License Agreement.
+
+        ACCEPT
+
+
+CWI LICENSE AGREEMENT FOR PYTHON 0.9.0 THROUGH 1.2
+--------------------------------------------------
+
+Copyright (c) 1991 - 1995, Stichting Mathematisch Centrum Amsterdam,
+The Netherlands.  All rights reserved.
+
+Permission to use, copy, modify, and distribute this software and its
+documentation for any purpose and without fee is hereby granted,
+provided that the above copyright notice appear in all copies and that
+both that copyright notice and this permission notice appear in
+supporting documentation, and that the name of Stichting Mathematisch
+Centrum or CWI not be used in advertising or publicity pertaining to
+distribution of the software without specific, written prior
+permission.
+
+STICHTING MATHEMATISCH CENTRUM DISCLAIMS ALL WARRANTIES WITH REGARD TO
+THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
+FITNESS, IN NO EVENT SHALL STICHTING MATHEMATISCH CENTRUM BE LIABLE
+FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
diff --git a/build.gradle.kts b/build.gradle.kts
index 849f1f6..2d37828 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -91,6 +91,9 @@
     "ownership/**/*",
     "**/OWNERS",
 
+    // Ignore CPython LICENSE file
+    "LICENSE.python",
+
     // Json doesn't support comments.
     "**/*.json",
 
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index a078fe9..2404691 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -130,6 +130,17 @@
     boolean validateShadowJar = true
 
     /**
+     * Controls whether the 'jmh' source set is enabled for JMH benchmarks.
+     *
+     * Add additional dependencies to the jmhCompile and jmhRuntime dependency
+     * sets.
+     *
+     * Note that the JMH annotation processor is enabled by default and that
+     * a 'jmh' task is created which executes JMH.
+     */
+    boolean enableJmh = false
+
+    /**
      * The set of excludes that should be used during validation of the shadow jar. Projects should override
      * the default with the most specific set of excludes that is valid for the contents of its shaded jar.
      *
@@ -462,6 +473,7 @@
     def spotbugs_version = "4.0.6"
     def testcontainers_version = "1.15.1"
     def arrow_version = "4.0.0"
+    def jmh_version = "1.32"
 
     // A map of maps containing common libraries used per language. To use:
     // dependencies {
@@ -628,7 +640,7 @@
         proto_google_cloud_firestore_v1             : "com.google.api.grpc:proto-google-cloud-firestore-v1", // google_cloud_platform_libraries_bom sets version
         proto_google_cloud_pubsub_v1                : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
         proto_google_cloud_pubsublite_v1            : "com.google.api.grpc:proto-google-cloud-pubsublite-v1:$google_cloud_pubsublite_version",
-        proto_google_cloud_spanner_v1: "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
+        proto_google_cloud_spanner_v1               : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
         proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
         proto_google_common_protos                  : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
         slf4j_api                                   : "org.slf4j:slf4j-api:$slf4j_version",
@@ -848,6 +860,7 @@
       List<String> skipDefRegexes = []
       skipDefRegexes << "AutoValue_.*"
       skipDefRegexes << "AutoOneOf_.*"
+      skipDefRegexes << ".*\\.jmh_generated\\..*"
       skipDefRegexes += configuration.generatedClassPatterns
       skipDefRegexes += configuration.classesTriggerCheckerBugs.keySet()
       String skipDefCombinedRegex = skipDefRegexes.collect({ regex -> "(${regex})"}).join("|")
@@ -1240,6 +1253,45 @@
         project.artifacts.testRuntime project.testJar
       }
 
+      if (configuration.enableJmh) {
+        // We specifically use a separate source set for JMH to ensure that it does not
+        // become a required artifact
+        project.sourceSets {
+          jmh {
+            java {
+              srcDir "src/jmh/java"
+            }
+            resources {
+              srcDir "src/jmh/resources"
+            }
+          }
+        }
+
+        project.dependencies {
+          jmhAnnotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$jmh_version"
+          jmhCompile "org.openjdk.jmh:jmh-core:$jmh_version"
+        }
+
+        project.task("jmh", type: JavaExec, dependsOn: project.jmhClasses, {
+          main = "org.openjdk.jmh.Main"
+          classpath = project.sourceSets.jmh.compileClasspath + project.sourceSets.jmh.runtimeClasspath
+          // For a list of arguments, see
+          // https://github.com/guozheng/jmh-tutorial/blob/master/README.md
+          //
+          // Filter for a specific benchmark to run (uncomment below)
+          // Note that multiple regex are supported each as a separate argument.
+          // args 'BeamFnLoggingClientBenchmark.testLoggingWithAllOptionalParameters'
+          // args 'additional regexp...'
+          //
+          // Enumerate available benchmarks and exit (uncomment below)
+          // args '-l'
+          //
+          // Enable connecting a debugger by disabling forking (uncomment below)
+          // Useful for debugging via an IDE such as Intellij
+          // args '-f0'
+        })
+      }
+
       project.ext.includeInJavaBom = configuration.publish
       project.ext.exportJavadoc = configuration.exportJavadoc
 
@@ -1738,6 +1790,7 @@
       project.docker { noCache true }
       project.tasks.create(name: "copyLicenses", type: Copy) {
         from "${project.rootProject.projectDir}/LICENSE"
+        from "${project.rootProject.projectDir}/LICENSE.python"
         from "${project.rootProject.projectDir}/NOTICE"
         into "build/target"
       }
diff --git a/release/go-licenses/Dockerfile b/release/go-licenses/Dockerfile
index 43e7870..6004f45 100644
--- a/release/go-licenses/Dockerfile
+++ b/release/go-licenses/Dockerfile
@@ -16,7 +16,7 @@
 # limitations under the License.
 ###############################################################################
 
-FROM golang:1.15.0-buster
+FROM golang:1.16.0-buster
 RUN go get github.com/google/go-licenses
 COPY get-licenses.sh /opt/apache/beam/
 ARG sdk_location
diff --git a/release/go-licenses/get-licenses.sh b/release/go-licenses/get-licenses.sh
index 727fb9b..be1e01d 100755
--- a/release/go-licenses/get-licenses.sh
+++ b/release/go-licenses/get-licenses.sh
@@ -19,6 +19,7 @@
 set -ex
 rm -rf /output/*
 
+export GO111MODULE=off
 go get $sdk_location
 
 go-licenses save $sdk_location --save_path=/output/licenses
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index e01e441..4d79285 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -167,6 +167,7 @@
   'org.apache.beam.sdk.testing.UsesParDoLifecycle',
   'org.apache.beam.sdk.testing.UsesMetricsPusher',
   'org.apache.beam.sdk.testing.UsesBundleFinalizer',
+  'org.apache.beam.sdk.testing.UsesStrictTimerOrdering',
 ]
 
 def commonRunnerV2ExcludeCategories = [
@@ -178,6 +179,7 @@
         'org.apache.beam.sdk.testing.UsesTestStream',
         'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime',
         'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
+        'org.apache.beam.sdk.testing.UsesStrictTimerOrdering',
 ]
 
 // For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
@@ -277,7 +279,10 @@
       commandLine "docker", "rmi", "--force", "${dockerImageName}"
     }
     exec {
-      commandLine "gcloud", "--quiet", "container", "images", "delete", "--force-delete-tags", "${dockerImageName}"
+      commandLine "gcloud", "--quiet", "container", "images", "untag", "${dockerImageName}"
+    }
+    exec {
+      commandLine "./scripts/cleanup_untagged_gcr_images.sh", "${dockerImageContainer}"
     }
   }
 }
@@ -319,7 +324,6 @@
       'org.apache.beam.sdk.testing.UsesMapState',
       'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
       'org.apache.beam.sdk.testing.UsesSetState',
-      'org.apache.beam.sdk.testing.UsesStrictTimerOrdering',
     ],
   ))
 }
diff --git a/runners/google-cloud-dataflow-java/scripts/cleanup_untagged_gcr_images.sh b/runners/google-cloud-dataflow-java/scripts/cleanup_untagged_gcr_images.sh
new file mode 100755
index 0000000..5bf8197
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/scripts/cleanup_untagged_gcr_images.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+#
+#    Licensed to the Apache Software Foundation (ASF) under one or more
+#    contributor license agreements.  See the NOTICE file distributed with
+#    this work for additional information regarding copyright ownership.
+#    The ASF licenses this file to You under the Apache License, Version 2.0
+#    (the "License"); you may not use this file except in compliance with
+#    the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+set -e
+
+IMAGE_NAME=$1
+
+# Find all untagged images
+DIGESTS=$(gcloud container images list-tags "${IMAGE_NAME}"  --filter='-tags:*' --format="get(digest)")
+
+# Delete image
+echo "${DIGESTS}" | while read -r digest; do
+  if [[ ! -z "${digest}" ]]; then
+    img="${IMAGE_NAME}@${digest}"
+    echo "Removing untagged image ${img}"
+    gcloud container images delete --quiet "${img}"
+  fi
+done
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 81ae092..3c06ee9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -1525,8 +1525,8 @@
 
                 try {
                   blockedStartMs.set(Instant.now().getMillis());
-                  current = queue.take();
-                  if (current != POISON_PILL) {
+                  current = queue.poll(180, TimeUnit.SECONDS);
+                  if (current != null && current != POISON_PILL) {
                     return true;
                   }
                   if (cancelled.get()) {
@@ -1535,7 +1535,8 @@
                   if (complete.get()) {
                     return false;
                   }
-                  throw new IllegalStateException("Got poison pill but stream is not done.");
+                  throw new IllegalStateException(
+                      "Got poison pill or timeout but stream is not done.");
                 } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
                   throw new CancellationException();
diff --git a/sdks/go/BUILD.md b/sdks/go/BUILD.md
index 8d99b93..d4c02d7 100644
--- a/sdks/go/BUILD.md
+++ b/sdks/go/BUILD.md
@@ -64,3 +64,13 @@
 
 If you make changes to .proto files, you will need to rebuild the generated code.
 Consult `pkg/beam/model/PROTOBUF.md`.
+
+If you make changes to .tmpl files, then add the specialize tool to your path.
+You can install specialize using:
+```
+go get github.com/apache/beam/sdks/go/cmd/specialize
+```
+Add it to your path:
+```
+export PATH=$PATH:$GOROOT/bin:$GOPATH/bin
+```
\ No newline at end of file
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go
index c7576fe..299edb2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go
@@ -286,8 +286,17 @@
 
 func (n *Combine) fail(err error) error {
 	n.status = Broken
-	n.err.TrySetError(err)
-	return err
+	if err2, ok := err.(*doFnError); ok {
+		return err2
+	}
+	combineError := &doFnError{
+		doFn: n.Fn.Name(),
+		err:  err,
+		uid:  n.UID,
+		pid:  n.PID,
+	}
+	n.err.TrySetError(combineError)
+	return combineError
 }
 
 func (n *Combine) String() string {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 099763f..f7a9f32 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -332,8 +332,18 @@
 
 func (n *ParDo) fail(err error) error {
 	n.status = Broken
-	n.err.TrySetError(err)
-	return err
+	if err2, ok := err.(*doFnError); ok {
+		return err2
+	}
+
+	parDoError := &doFnError{
+		doFn: n.Fn.Name(),
+		err:  err,
+		uid:  n.UID,
+		pid:  n.PID,
+	}
+	n.err.TrySetError(parDoError)
+	return parDoError
 }
 
 func (n *ParDo) String() string {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/util.go b/sdks/go/pkg/beam/core/runtime/exec/util.go
index dc0608b..273c5c8 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/util.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/util.go
@@ -17,6 +17,7 @@
 
 import (
 	"context"
+	"fmt"
 	"runtime/debug"
 
 	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
@@ -33,13 +34,29 @@
 	return UnitID(g.last)
 }
 
+type doFnError struct {
+	doFn string
+	err  error
+	uid  UnitID
+	pid  string
+}
+
+func (e *doFnError) Error() string {
+	return fmt.Sprintf("DoFn[UID:%v, PID:%v, Name: %v] failed:\n%v", e.uid, e.pid, e.doFn, e.err)
+}
+
 // callNoPanic calls the given function and catches any panic.
 func callNoPanic(ctx context.Context, fn func(context.Context) error) (err error) {
 	defer func() {
 		if r := recover(); r != nil {
-			// Top level error is the panic itself, but also include the stack trace as the original error.
-			// Higher levels can then add appropriate context without getting pushed down by the stack trace.
-			err = errors.SetTopLevelMsgf(errors.Errorf("panic: %v %s", r, debug.Stack()), "panic: %v", r)
+			// Check if the panic value is from a failed DoFn, and return it without a panic trace.
+			if e, ok := r.(*doFnError); ok {
+				err = e
+			} else {
+				// Top level error is the panic itself, but also include the stack trace as the original error.
+				// Higher levels can then add appropriate context without getting pushed down by the stack trace.
+				err = errors.SetTopLevelMsgf(errors.Errorf("panic: %v %s", r, debug.Stack()), "panic: %v", r)
+			}
 		}
 	}()
 	return fn(ctx)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/util_test.go b/sdks/go/pkg/beam/core/runtime/exec/util_test.go
new file mode 100644
index 0000000..0009c40
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/util_test.go
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+	"context"
+	"strings"
+	"testing"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+	"github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
+)
+
+// testSimpleError tests for a simple case that doesn't panic
+func TestCallNoPanic_simple(t *testing.T) {
+	ctx := context.Background()
+	want := errors.New("Simple error.")
+	got := callNoPanic(ctx, func(c context.Context) error { return errors.New("Simple error.") })
+
+	if got.Error() != want.Error() {
+		t.Errorf("callNoPanic(<func that returns error>) = %v, want %v", got, want)
+	}
+}
+
+// testPanicError tests for the case in which a normal error is passed to panic, resulting in panic trace.
+func TestCallNoPanic_panic(t *testing.T) {
+	ctx := context.Background()
+	got := callNoPanic(ctx, func(c context.Context) error { panic("Panic error") })
+	if !strings.Contains(got.Error(), "panic:") {
+		t.Errorf("callNoPanic(<func that panics with a string>) didn't panic, got = %v", got)
+	}
+}
+
+// testWrapPanicError tests for the case in which error is passed to panic from
+// DoFn, resulting in formatted error message for DoFn.
+func TestCallNoPanic_wrappedPanic(t *testing.T) {
+	ctx := context.Background()
+	errs := errors.New("SumFn error")
+	parDoError := &doFnError{
+		doFn: "sumFn",
+		err:  errs,
+		uid:  1,
+		pid:  "Plan ID",
+	}
+	want := "DoFn[<1>;<Plan ID>]<sumFn> returned error:<SumFn error>"
+	var err errorx.GuardedError
+	err.TrySetError(parDoError)
+
+	got := callNoPanic(ctx, func(c context.Context) error { panic(parDoError) })
+
+	if strings.Contains(got.Error(), "panic:") {
+		t.Errorf("callNoPanic(<func that panics with a wrapped known error>) did not filter panic, want %v, got %v", want, got)
+	}
+}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 2e3ea3f..d8ed074 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -106,7 +106,14 @@
 }
 
 // reconcileRegistrations actually finishes the registration process.
-func (r *Registry) reconcileRegistrations() error {
+func (r *Registry) reconcileRegistrations() (deferedErr error) {
+	var ut reflect.Type
+	defer func() {
+		if r := recover(); r != nil {
+			deferedErr = errors.Errorf("panicked: %v", r)
+			deferedErr = errors.WithContextf(deferedErr, "reconciling schema registration for type %v", ut)
+		}
+	}()
 	for _, ut := range r.toReconcile {
 		check := func(ut reflect.Type) bool {
 			return coder.LookupCustomCoder(ut) != nil
diff --git a/sdks/go/pkg/beam/core/util/reflectx/call.go b/sdks/go/pkg/beam/core/util/reflectx/call.go
index 44f0602..dde9a8a 100644
--- a/sdks/go/pkg/beam/core/util/reflectx/call.go
+++ b/sdks/go/pkg/beam/core/util/reflectx/call.go
@@ -19,8 +19,9 @@
 	"reflect"
 	"sync"
 
-	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 	"runtime/debug"
+
+	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
 )
 
 //go:generate specialize --input=calls.tmpl
diff --git a/sdks/go/pkg/beam/core/util/reflectx/calls.go b/sdks/go/pkg/beam/core/util/reflectx/calls.go
index a1bfe6b..3297a49 100644
--- a/sdks/go/pkg/beam/core/util/reflectx/calls.go
+++ b/sdks/go/pkg/beam/core/util/reflectx/calls.go
@@ -17,7 +17,10 @@
 
 package reflectx
 
-import "reflect"
+import (
+	"fmt"
+	"reflect"
+)
 
 // Generated arity-specialized Func implementations to avoid runtime temporary
 // slices. Code that knows the arity can potentially avoid that overhead in
@@ -57,7 +60,7 @@
 
 func ToFunc0x0(c Func) Func0x0 {
 	if c.Type().NumIn() != 0 || c.Type().NumOut() != 0 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 0 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func0x0); ok {
 		return sc
@@ -98,7 +101,7 @@
 
 func ToFunc0x1(c Func) Func0x1 {
 	if c.Type().NumIn() != 0 || c.Type().NumOut() != 1 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 0 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func0x1); ok {
 		return sc
@@ -139,7 +142,7 @@
 
 func ToFunc0x2(c Func) Func0x2 {
 	if c.Type().NumIn() != 0 || c.Type().NumOut() != 2 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 0 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func0x2); ok {
 		return sc
@@ -180,7 +183,7 @@
 
 func ToFunc0x3(c Func) Func0x3 {
 	if c.Type().NumIn() != 0 || c.Type().NumOut() != 3 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 0 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func0x3); ok {
 		return sc
@@ -221,7 +224,7 @@
 
 func ToFunc1x0(c Func) Func1x0 {
 	if c.Type().NumIn() != 1 || c.Type().NumOut() != 0 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 1 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func1x0); ok {
 		return sc
@@ -262,7 +265,7 @@
 
 func ToFunc1x1(c Func) Func1x1 {
 	if c.Type().NumIn() != 1 || c.Type().NumOut() != 1 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 1 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func1x1); ok {
 		return sc
@@ -303,7 +306,7 @@
 
 func ToFunc1x2(c Func) Func1x2 {
 	if c.Type().NumIn() != 1 || c.Type().NumOut() != 2 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 1 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func1x2); ok {
 		return sc
@@ -344,7 +347,7 @@
 
 func ToFunc1x3(c Func) Func1x3 {
 	if c.Type().NumIn() != 1 || c.Type().NumOut() != 3 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 1 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func1x3); ok {
 		return sc
@@ -385,7 +388,7 @@
 
 func ToFunc2x0(c Func) Func2x0 {
 	if c.Type().NumIn() != 2 || c.Type().NumOut() != 0 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 2 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func2x0); ok {
 		return sc
@@ -426,7 +429,7 @@
 
 func ToFunc2x1(c Func) Func2x1 {
 	if c.Type().NumIn() != 2 || c.Type().NumOut() != 1 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 2 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func2x1); ok {
 		return sc
@@ -467,7 +470,7 @@
 
 func ToFunc2x2(c Func) Func2x2 {
 	if c.Type().NumIn() != 2 || c.Type().NumOut() != 2 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 2 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func2x2); ok {
 		return sc
@@ -508,7 +511,7 @@
 
 func ToFunc2x3(c Func) Func2x3 {
 	if c.Type().NumIn() != 2 || c.Type().NumOut() != 3 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 2 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func2x3); ok {
 		return sc
@@ -549,7 +552,7 @@
 
 func ToFunc3x0(c Func) Func3x0 {
 	if c.Type().NumIn() != 3 || c.Type().NumOut() != 0 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 3 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func3x0); ok {
 		return sc
@@ -590,7 +593,7 @@
 
 func ToFunc3x1(c Func) Func3x1 {
 	if c.Type().NumIn() != 3 || c.Type().NumOut() != 1 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 3 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func3x1); ok {
 		return sc
@@ -631,7 +634,7 @@
 
 func ToFunc3x2(c Func) Func3x2 {
 	if c.Type().NumIn() != 3 || c.Type().NumOut() != 2 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 3 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func3x2); ok {
 		return sc
@@ -672,7 +675,7 @@
 
 func ToFunc3x3(c Func) Func3x3 {
 	if c.Type().NumIn() != 3 || c.Type().NumOut() != 3 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 3 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func3x3); ok {
 		return sc
@@ -713,7 +716,7 @@
 
 func ToFunc4x0(c Func) Func4x0 {
 	if c.Type().NumIn() != 4 || c.Type().NumOut() != 0 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 4 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func4x0); ok {
 		return sc
@@ -754,7 +757,7 @@
 
 func ToFunc4x1(c Func) Func4x1 {
 	if c.Type().NumIn() != 4 || c.Type().NumOut() != 1 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 4 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func4x1); ok {
 		return sc
@@ -795,7 +798,7 @@
 
 func ToFunc4x2(c Func) Func4x2 {
 	if c.Type().NumIn() != 4 || c.Type().NumOut() != 2 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 4 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func4x2); ok {
 		return sc
@@ -836,7 +839,7 @@
 
 func ToFunc4x3(c Func) Func4x3 {
 	if c.Type().NumIn() != 4 || c.Type().NumOut() != 3 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 4 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func4x3); ok {
 		return sc
@@ -877,7 +880,7 @@
 
 func ToFunc5x0(c Func) Func5x0 {
 	if c.Type().NumIn() != 5 || c.Type().NumOut() != 0 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 5 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func5x0); ok {
 		return sc
@@ -918,7 +921,7 @@
 
 func ToFunc5x1(c Func) Func5x1 {
 	if c.Type().NumIn() != 5 || c.Type().NumOut() != 1 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 5 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func5x1); ok {
 		return sc
@@ -959,7 +962,7 @@
 
 func ToFunc5x2(c Func) Func5x2 {
 	if c.Type().NumIn() != 5 || c.Type().NumOut() != 2 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 5 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func5x2); ok {
 		return sc
@@ -1000,7 +1003,7 @@
 
 func ToFunc5x3(c Func) Func5x3 {
 	if c.Type().NumIn() != 5 || c.Type().NumOut() != 3 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 5 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func5x3); ok {
 		return sc
@@ -1041,7 +1044,7 @@
 
 func ToFunc6x0(c Func) Func6x0 {
 	if c.Type().NumIn() != 6 || c.Type().NumOut() != 0 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 6 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func6x0); ok {
 		return sc
@@ -1082,7 +1085,7 @@
 
 func ToFunc6x1(c Func) Func6x1 {
 	if c.Type().NumIn() != 6 || c.Type().NumOut() != 1 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 6 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func6x1); ok {
 		return sc
@@ -1123,7 +1126,7 @@
 
 func ToFunc6x2(c Func) Func6x2 {
 	if c.Type().NumIn() != 6 || c.Type().NumOut() != 2 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 6 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func6x2); ok {
 		return sc
@@ -1164,7 +1167,7 @@
 
 func ToFunc6x3(c Func) Func6x3 {
 	if c.Type().NumIn() != 6 || c.Type().NumOut() != 3 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 6 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func6x3); ok {
 		return sc
@@ -1205,7 +1208,7 @@
 
 func ToFunc7x0(c Func) Func7x0 {
 	if c.Type().NumIn() != 7 || c.Type().NumOut() != 0 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 7 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func7x0); ok {
 		return sc
@@ -1246,7 +1249,7 @@
 
 func ToFunc7x1(c Func) Func7x1 {
 	if c.Type().NumIn() != 7 || c.Type().NumOut() != 1 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 7 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func7x1); ok {
 		return sc
@@ -1287,7 +1290,7 @@
 
 func ToFunc7x2(c Func) Func7x2 {
 	if c.Type().NumIn() != 7 || c.Type().NumOut() != 2 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 7 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func7x2); ok {
 		return sc
@@ -1328,7 +1331,7 @@
 
 func ToFunc7x3(c Func) Func7x3 {
 	if c.Type().NumIn() != 7 || c.Type().NumOut() != 3 {
-		panic("incompatible func type")
+		panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 7 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
 	}
 	if sc, ok := c.(Func7x3); ok {
 		return sc
diff --git a/sdks/go/pkg/beam/core/util/reflectx/calls.tmpl b/sdks/go/pkg/beam/core/util/reflectx/calls.tmpl
index 60b6107..75b80e2 100644
--- a/sdks/go/pkg/beam/core/util/reflectx/calls.tmpl
+++ b/sdks/go/pkg/beam/core/util/reflectx/calls.tmpl
@@ -15,7 +15,10 @@
 
 package reflectx
 
-import "reflect"
+import (
+    "reflect"
+    "fmt"    
+)
 
 // Generated arity-specialized Func implementations to avoid runtime temporary
 // slices. Code that knows the arity can potentially avoid that overhead in
@@ -57,7 +60,7 @@
 
 func ToFunc{{$in}}x{{$out}}(c Func) Func{{$in}}x{{$out}} {
     if c.Type().NumIn() != {{$in}} || c.Type().NumOut() != {{$out}} {
-        panic("incompatible func type")
+        panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want {{$in}} inputs and {{$out}} outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
     }
     if sc, ok := c.(Func{{$in}}x{{$out}}); ok {
         return sc
diff --git a/sdks/go/pkg/beam/testing/passert/equals_test.go b/sdks/go/pkg/beam/testing/passert/equals_test.go
index 0af7d43..3138705 100644
--- a/sdks/go/pkg/beam/testing/passert/equals_test.go
+++ b/sdks/go/pkg/beam/testing/passert/equals_test.go
@@ -185,6 +185,7 @@
 	fmt.Println(err)
 
 	// Output:
+	// DoFn[UID:1, PID:passert.failIfBadEntries, Name: github.com/apache/beam/sdks/go/pkg/beam/testing/passert.failIfBadEntries] failed:
 	// actual PCollection does not match expected values
 	// =========
 	// 2 correct entries (present in both)
diff --git a/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
index 71f2172..b28371c 100644
--- a/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
@@ -64,6 +64,9 @@
     <Class name="~.*AutoValue_.*"/>
   </Match>
   <Match>
+    <Package name="~.*jmh_generated.*"/>
+  </Match>
+  <Match>
     <Package name="org.apache.beam.sdk.extensions.sql.impl.parser.impl"/>
   </Match>
   <Match>
diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle
index 50ea8c1..3c859ae 100644
--- a/sdks/java/harness/build.gradle
+++ b/sdks/java/harness/build.gradle
@@ -33,6 +33,7 @@
   ],
   automaticModuleName: 'org.apache.beam.fn.harness',
   validateShadowJar: false,
+  enableJmh: true,
   testShadowJar: true,
   shadowClosure:
   // Create an uber jar without repackaging for the SDK harness
@@ -72,4 +73,6 @@
   testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
   testCompile project(":runners:core-construction-java")
   shadowTestRuntimeClasspath library.java.slf4j_jdk14
+  jmhCompile project(path: ":sdks:java:harness", configuration: "shadowTest")
+  jmhRuntime library.java.slf4j_jdk14
 }
diff --git a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java
new file mode 100644
index 0000000..9e009c3
--- /dev/null
+++ b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.logging;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.SimpleExecutionState;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.inprocess.InProcessServerBuilder;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Benchmarks for {@link BeamFnLoggingClient}. */
+public class BeamFnLoggingClientBenchmark {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamFnLoggingClientBenchmark.class);
+
+  /** A logging service which counts the number of calls it received. */
+  public static class CallCountLoggingService extends BeamFnLoggingGrpc.BeamFnLoggingImplBase {
+    private AtomicInteger callCount = new AtomicInteger();
+
+    @Override
+    public StreamObserver<BeamFnApi.LogEntry.List> logging(
+        StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+      return new StreamObserver<BeamFnApi.LogEntry.List>() {
+
+        @Override
+        public void onNext(BeamFnApi.LogEntry.List list) {
+          callCount.incrementAndGet();
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+          outboundObserver.onError(throwable);
+        }
+
+        @Override
+        public void onCompleted() {
+          outboundObserver.onCompleted();
+        }
+      };
+    }
+  }
+
+  /** Setup a simple logging service and configure the {@link BeamFnLoggingClient}. */
+  @State(Scope.Benchmark)
+  public static class ManageLoggingClientAndService {
+    public final BeamFnLoggingClient loggingClient;
+    public final CallCountLoggingService loggingService;
+    public final Server server;
+
+    public ManageLoggingClientAndService() {
+      try {
+        ApiServiceDescriptor apiServiceDescriptor =
+            ApiServiceDescriptor.newBuilder()
+                .setUrl(BeamFnLoggingClientBenchmark.class.getName() + "#" + UUID.randomUUID())
+                .build();
+        ManagedChannelFactory managedChannelFactory = InProcessManagedChannelFactory.create();
+        loggingService = new CallCountLoggingService();
+        server =
+            InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+                .addService(loggingService)
+                .build();
+        server.start();
+        loggingClient =
+            new BeamFnLoggingClient(
+                PipelineOptionsFactory.create(),
+                apiServiceDescriptor,
+                managedChannelFactory::forDescriptor);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @TearDown(Level.Trial)
+    public void tearDown() throws Exception {
+      loggingClient.close();
+      server.shutdown();
+      if (server.awaitTermination(30, TimeUnit.SECONDS)) {
+        server.shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * A {@link ManageLoggingClientAndService} which validates that more than zero calls made it to
+   * the service.
+   */
+  @State(Scope.Benchmark)
+  public static class ManyExpectedCallsLoggingClientAndService
+      extends ManageLoggingClientAndService {
+    @Override
+    @TearDown
+    public void tearDown() throws Exception {
+      super.tearDown();
+      if (loggingService.callCount.get() <= 0) {
+        throw new IllegalStateException(
+            "Server expected greater then zero calls. Benchmark misconfigured?");
+      }
+    }
+  }
+
+  /**
+   * A {@link ManageLoggingClientAndService} which validates that exactly zero calls made it to the
+   * service.
+   */
+  @State(Scope.Benchmark)
+  public static class ZeroExpectedCallsLoggingClientAndService
+      extends ManageLoggingClientAndService {
+    @Override
+    @TearDown
+    public void tearDown() throws Exception {
+      super.tearDown();
+      if (loggingService.callCount.get() != 0) {
+        throw new IllegalStateException("Server expected zero calls. Benchmark misconfigured?");
+      }
+    }
+  }
+
+  /** Sets up the {@link ExecutionStateTracker} and an execution state. */
+  @State(Scope.Benchmark)
+  public static class ManageExecutionState {
+    private final ExecutionStateTracker executionStateTracker;
+    private final SimpleExecutionState simpleExecutionState;
+
+    public ManageExecutionState() {
+      executionStateTracker = ExecutionStateTracker.newForTest();
+      HashMap<String, String> labelsMetadata = new HashMap<>();
+      labelsMetadata.put(MonitoringInfoConstants.Labels.PTRANSFORM, "ptransformId");
+      simpleExecutionState =
+          new SimpleExecutionState(
+              ExecutionStateTracker.PROCESS_STATE_NAME,
+              MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS,
+              labelsMetadata);
+    }
+
+    @TearDown
+    public void tearDown() throws Exception {
+      executionStateTracker.reset();
+    }
+  }
+
+  @Benchmark
+  @Threads(16) // Use several threads since we expect contention during logging
+  public void testLogging(ManyExpectedCallsLoggingClientAndService client) {
+    LOG.warn("log me");
+  }
+
+  @Benchmark
+  @Threads(16) // Use several threads since we expect contention during logging
+  public void testLoggingWithAllOptionalParameters(
+      ManyExpectedCallsLoggingClientAndService client, ManageExecutionState executionState)
+      throws Exception {
+    BeamFnLoggingMDC.setInstructionId("instruction id");
+    try (Closeable state =
+        executionState.executionStateTracker.enterState(executionState.simpleExecutionState)) {
+      LOG.warn("log me");
+    }
+    BeamFnLoggingMDC.setInstructionId(null);
+  }
+
+  @Benchmark
+  @Threads(16) // Use several threads since we expect contention during logging
+  public void testSkippedLogging(ZeroExpectedCallsLoggingClientAndService client) {
+    LOG.trace("no log");
+  }
+}
diff --git a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java
new file mode 100644
index 0000000..304238e
--- /dev/null
+++ b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Benchmarks for logging. */
+package org.apache.beam.fn.harness.logging;
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index 6cd9116..fe42f1a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -57,7 +57,6 @@
 import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCallStreamObserver;
 import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientResponseObserver;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -230,8 +229,8 @@
         String transformId =
             ((SimpleExecutionState) state)
                 .getLabels()
-                .getOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, "");
-        if (!Strings.isNullOrEmpty(transformId)) {
+                .get(MonitoringInfoConstants.Labels.PTRANSFORM);
+        if (transformId != null) {
           builder.setTransformId(transformId);
         }
       }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 15ea5c0..16b96bf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -30,10 +30,8 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.ShardedKeyCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
@@ -147,8 +145,8 @@
   private ValueProvider<String> loadJobProjectId;
   private final Coder<ElementT> elementCoder;
   private final RowWriterFactory<ElementT, DestinationT> rowWriterFactory;
-  private String kmsKey;
-  private boolean clusteringEnabled;
+  private final String kmsKey;
+  private final boolean clusteringEnabled;
 
   // The maximum number of times to retry failed load or copy jobs.
   private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS;
@@ -274,6 +272,8 @@
   private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> input) {
     Pipeline p = input.getPipeline();
     final PCollectionView<String> loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD);
+    final PCollectionView<String> tempLoadJobIdPrefixView =
+        createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD);
     final PCollectionView<String> copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY);
     final PCollectionView<String> tempFilePrefixView =
         createTempFilePrefixView(p, loadJobIdPrefixView);
@@ -321,9 +321,9 @@
                             .plusDelayOf(triggeringFrequency)))
                 .discardingFiredPanes());
 
-    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
+    TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> multiPartitionsTag =
         new TupleTag<>("multiPartitionsTag");
-    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
+    TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag =
         new TupleTag<>("singlePartitionTag");
 
     // If we have non-default triggered output, we can't use the side-input technique used in
@@ -331,10 +331,10 @@
     // determinism.
     PCollectionTuple partitions =
         results
-            .apply("AttachSingletonKey", WithKeys.of((Void) null))
+            .apply("AttachDestinationKey", WithKeys.of(result -> result.destination))
             .setCoder(
-                KvCoder.of(VoidCoder.of(), WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
-            .apply("GroupOntoSingleton", GroupByKey.create())
+                KvCoder.of(destinationCoder, WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
+            .apply("GroupFilesByDestination", GroupByKey.create())
             .apply("ExtractResultValues", Values.create())
             .apply(
                 "WritePartitionTriggered",
@@ -350,14 +350,14 @@
                             rowWriterFactory))
                     .withSideInputs(tempFilePrefixView)
                     .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-    PCollection<KV<TableDestination, String>> tempTables =
-        writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView);
+    PCollection<KV<TableDestination, WriteTables.Result>> tempTables =
+        writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView);
 
     tempTables
         // Now that the load job has happened, we want the rename to happen immediately.
         .apply(
             "Window Into Global Windows",
-            Window.<KV<TableDestination, String>>into(new GlobalWindows())
+            Window.<KV<TableDestination, WriteTables.Result>>into(new GlobalWindows())
                 .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
         .apply("Add Void Key", WithKeys.of((Void) null))
         .setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
@@ -382,6 +382,9 @@
   public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> input) {
     Pipeline p = input.getPipeline();
     final PCollectionView<String> loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD);
+    final PCollectionView<String> tempLoadJobIdPrefixView =
+        createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD);
+    final PCollectionView<String> copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY);
     final PCollectionView<String> tempFilePrefixView =
         createTempFilePrefixView(p, loadJobIdPrefixView);
     PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
@@ -395,10 +398,10 @@
             ? writeDynamicallyShardedFilesUntriggered(inputInGlobalWindow, tempFilePrefixView)
             : writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView);
 
-    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
-        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
-    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
-        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {};
+    TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> multiPartitionsTag =
+        new TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>>("multiPartitionsTag") {};
+    TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag =
+        new TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>>("singlePartitionTag") {};
 
     // This transform will look at the set of files written for each table, and if any table has
     // too many files or bytes, will partition that table's files into multiple partitions for
@@ -421,8 +424,8 @@
                             rowWriterFactory))
                     .withSideInputs(tempFilePrefixView)
                     .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
-    PCollection<KV<TableDestination, String>> tempTables =
-        writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView);
+    PCollection<KV<TableDestination, WriteTables.Result>> tempTables =
+        writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView);
 
     tempTables
         .apply("ReifyRenameInput", new ReifyAsIterable<>())
@@ -431,7 +434,7 @@
             ParDo.of(
                     new WriteRename(
                         bigQueryServices,
-                        loadJobIdPrefixView,
+                        copyJobIdPrefixView,
                         writeDisposition,
                         createDisposition,
                         maxRetryJobs,
@@ -637,23 +640,22 @@
         .apply(
             "WriteGroupedRecords",
             ParDo.of(
-                    new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
-                        tempFilePrefix, maxFileSize, rowWriterFactory))
+                    new WriteGroupedRecordsToFiles<>(tempFilePrefix, maxFileSize, rowWriterFactory))
                 .withSideInputs(tempFilePrefix))
         .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
   }
 
   // Take in a list of files and write them to temporary tables.
-  private PCollection<KV<TableDestination, String>> writeTempTables(
-      PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
+  private PCollection<KV<TableDestination, WriteTables.Result>> writeTempTables(
+      PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input,
       PCollectionView<String> jobIdTokenView) {
     List<PCollectionView<?>> sideInputs = Lists.newArrayList(jobIdTokenView);
     sideInputs.addAll(dynamicDestinations.getSideInputs());
 
-    Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
+    Coder<KV<ShardedKey<DestinationT>, WritePartition.Result>> partitionsCoder =
         KvCoder.of(
             ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
-            ListCoder.of(StringUtf8Coder.of()));
+            WritePartition.ResultCoder.INSTANCE);
 
     // If the final destination table exists already (and we're appending to it), then the temp
     // tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object
@@ -695,20 +697,24 @@
                 rowWriterFactory.getSourceFormat(),
                 useAvroLogicalTypes,
                 schemaUpdateOptions))
-        .setCoder(KvCoder.of(tableDestinationCoder, StringUtf8Coder.of()));
+        .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE));
   }
 
   // In the case where the files fit into a single load job, there's no need to write temporary
   // tables and rename. We can load these files directly into the target BigQuery table.
   void writeSinglePartition(
-      PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
+      PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input,
       PCollectionView<String> loadJobIdPrefixView) {
     List<PCollectionView<?>> sideInputs = Lists.newArrayList(loadJobIdPrefixView);
     sideInputs.addAll(dynamicDestinations.getSideInputs());
-    Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
+
+    Coder<TableDestination> tableDestinationCoder =
+        clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of();
+
+    Coder<KV<ShardedKey<DestinationT>, WritePartition.Result>> partitionsCoder =
         KvCoder.of(
             ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
-            ListCoder.of(StringUtf8Coder.of()));
+            WritePartition.ResultCoder.INSTANCE);
     // Write single partition to final table
     input
         .setCoder(partitionsCoder)
@@ -731,7 +737,8 @@
                 kmsKey,
                 rowWriterFactory.getSourceFormat(),
                 useAvroLogicalTypes,
-                schemaUpdateOptions));
+                schemaUpdateOptions))
+        .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE));
   }
 
   private WriteResult writeResult(Pipeline p) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java
index 7e800fd0..7eae6fe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java
@@ -69,6 +69,7 @@
 
   public enum JobType {
     LOAD,
+    TEMP_TABLE_LOAD,
     COPY,
     EXPORT,
     QUERY,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index cd4f163..e1e0566 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -17,8 +17,17 @@
  */
 package org.apache.beam.sdk.io.gcp.bigquery;
 
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.values.KV;
@@ -39,7 +48,32 @@
 class WritePartition<DestinationT>
     extends DoFn<
         Iterable<WriteBundlesToFiles.Result<DestinationT>>,
-        KV<ShardedKey<DestinationT>, List<String>>> {
+        KV<ShardedKey<DestinationT>, WritePartition.Result>> {
+  @AutoValue
+  abstract static class Result {
+    public abstract List<String> getFilenames();
+
+    abstract Boolean isFirstPane();
+  }
+
+  static class ResultCoder extends AtomicCoder<Result> {
+    private static final Coder<List<String>> FILENAMES_CODER = ListCoder.of(StringUtf8Coder.of());
+    private static final Coder<Boolean> FIRST_PANE_CODER = BooleanCoder.of();
+    static final ResultCoder INSTANCE = new ResultCoder();
+
+    @Override
+    public void encode(Result value, OutputStream outStream) throws IOException {
+      FILENAMES_CODER.encode(value.getFilenames(), outStream);
+      FIRST_PANE_CODER.encode(value.isFirstPane(), outStream);
+    }
+
+    @Override
+    public Result decode(InputStream inStream) throws IOException {
+      return new AutoValue_WritePartition_Result(
+          FILENAMES_CODER.decode(inStream), FIRST_PANE_CODER.decode(inStream));
+    }
+  }
+
   private final boolean singletonTable;
   private final DynamicDestinations<?, DestinationT> dynamicDestinations;
   private final PCollectionView<String> tempFilePrefix;
@@ -47,8 +81,9 @@
   private final long maxSizeBytes;
   private final RowWriterFactory<?, DestinationT> rowWriterFactory;
 
-  private @Nullable TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag;
-  private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag;
+  private @Nullable TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>>
+      multiPartitionsTag;
+  private TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag;
 
   private static class PartitionData {
     private int numFiles = 0;
@@ -131,8 +166,8 @@
       PCollectionView<String> tempFilePrefix,
       int maxNumFiles,
       long maxSizeBytes,
-      TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag,
-      TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag,
+      TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> multiPartitionsTag,
+      TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag,
       RowWriterFactory<?, DestinationT> rowWriterFactory) {
     this.singletonTable = singletonTable;
     this.dynamicDestinations = dynamicDestinations;
@@ -147,7 +182,6 @@
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
     List<WriteBundlesToFiles.Result<DestinationT>> results = Lists.newArrayList(c.element());
-
     // If there are no elements to write _and_ the user specified a constant output table, then
     // generate an empty table of that name.
     if (results.isEmpty() && singletonTable) {
@@ -161,7 +195,8 @@
       BigQueryRowWriter.Result writerResult = writer.getResult();
 
       results.add(
-          new Result<>(writerResult.resourceId.toString(), writerResult.byteSize, destination));
+          new WriteBundlesToFiles.Result<>(
+              writerResult.resourceId.toString(), writerResult.byteSize, destination));
     }
 
     Map<DestinationT, DestinationData> currentResults = Maps.newHashMap();
@@ -190,11 +225,16 @@
       // In the fast-path case where we only output one table, the transform loads it directly
       // to the final table. In this case, we output on a special TupleTag so the enclosing
       // transform knows to skip the rename step.
-      TupleTag<KV<ShardedKey<DestinationT>, List<String>>> outputTag =
+      TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> outputTag =
           (destinationData.getPartitions().size() == 1) ? singlePartitionTag : multiPartitionsTag;
       for (int i = 0; i < destinationData.getPartitions().size(); ++i) {
         PartitionData partitionData = destinationData.getPartitions().get(i);
-        c.output(outputTag, KV.of(ShardedKey.of(destination, i + 1), partitionData.getFilenames()));
+        c.output(
+            outputTag,
+            KV.of(
+                ShardedKey.of(destination, i + 1),
+                new AutoValue_WritePartition_Result(
+                    partitionData.getFilenames(), c.pane().isFirst())));
       }
     }
   }
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index a45f6f8..80201ff 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -39,6 +39,7 @@
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
 import org.slf4j.Logger;
@@ -51,7 +52,7 @@
 @SuppressWarnings({
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
-class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> {
+class WriteRename extends DoFn<Iterable<KV<TableDestination, WriteTables.Result>>, Void> {
   private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class);
 
   private final BigQueryServices bqServices;
@@ -116,12 +117,15 @@
   }
 
   @ProcessElement
-  public void processElement(ProcessContext c) throws Exception {
-    Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();
-    for (KV<TableDestination, String> entry : c.element()) {
+  public void processElement(
+      @Element Iterable<KV<TableDestination, WriteTables.Result>> element, ProcessContext c)
+      throws Exception {
+    Multimap<TableDestination, WriteTables.Result> tempTables = ArrayListMultimap.create();
+    for (KV<TableDestination, WriteTables.Result> entry : element) {
       tempTables.put(entry.getKey(), entry.getValue());
     }
-    for (Map.Entry<TableDestination, Collection<String>> entry : tempTables.asMap().entrySet()) {
+    for (Map.Entry<TableDestination, Collection<WriteTables.Result>> entry :
+        tempTables.asMap().entrySet()) {
       // Process each destination table.
       // Do not copy if no temp tables are provided.
       if (!entry.getValue().isEmpty()) {
@@ -165,17 +169,27 @@
   }
 
   private PendingJobData startWriteRename(
-      TableDestination finalTableDestination, Iterable<String> tempTableNames, ProcessContext c)
+      TableDestination finalTableDestination,
+      Iterable<WriteTables.Result> tempTableNames,
+      ProcessContext c)
       throws Exception {
+    // The pane may have advanced either here due to triggering or due to an upstream trigger. We
+    // check the upstream
+    // trigger to handle the case where an earlier pane triggered the single-partition path. If this
+    // happened, then the
+    // table will already exist so we want to append to the table.
+    boolean isFirstPane =
+        Iterables.getFirst(tempTableNames, null).isFirstPane() && c.pane().isFirst();
     WriteDisposition writeDisposition =
-        (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
+        isFirstPane ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
     CreateDisposition createDisposition =
-        (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
+        isFirstPane ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
     List<TableReference> tempTables =
         StreamSupport.stream(tempTableNames.spliterator(), false)
-            .map(table -> BigQueryHelpers.fromJsonString(table, TableReference.class))
+            .map(
+                result ->
+                    BigQueryHelpers.fromJsonString(result.getTableName(), TableReference.class))
             .collect(Collectors.toList());
-    ;
 
     // Make sure each destination table gets a unique job id.
     String jobIdPrefix =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 465c924..32ed1fe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -26,11 +26,17 @@
 import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.api.services.bigquery.model.TimePartitioning;
+import com.google.auto.value.AutoValue;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
@@ -68,7 +74,10 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,8 +98,35 @@
 })
 class WriteTables<DestinationT>
     extends PTransform<
-        PCollection<KV<ShardedKey<DestinationT>, List<String>>>,
-        PCollection<KV<TableDestination, String>>> {
+        PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>>,
+        PCollection<KV<TableDestination, WriteTables.Result>>> {
+  @AutoValue
+  abstract static class Result {
+    abstract String getTableName();
+
+    abstract Boolean isFirstPane();
+  }
+
+  static class ResultCoder extends AtomicCoder<WriteTables.Result> {
+    static final ResultCoder INSTANCE = new ResultCoder();
+
+    @Override
+    public void encode(Result value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream)
+        throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
+            @Initialized IOException {
+      StringUtf8Coder.of().encode(value.getTableName(), outStream);
+      BooleanCoder.of().encode(value.isFirstPane(), outStream);
+    }
+
+    @Override
+    public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream)
+        throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
+            @Initialized IOException {
+      return new AutoValue_WriteTables_Result(
+          StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream));
+    }
+  }
+
   private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class);
 
   private final boolean tempTable;
@@ -101,7 +137,7 @@
   private final Set<SchemaUpdateOption> schemaUpdateOptions;
   private final DynamicDestinations<?, DestinationT> dynamicDestinations;
   private final List<PCollectionView<?>> sideInputs;
-  private final TupleTag<KV<TableDestination, String>> mainOutputTag;
+  private final TupleTag<KV<TableDestination, WriteTables.Result>> mainOutputTag;
   private final TupleTag<String> temporaryFilesTag;
   private final ValueProvider<String> loadJobProjectId;
   private final int maxRetryJobs;
@@ -113,7 +149,9 @@
   private @Nullable JobService jobService;
 
   private class WriteTablesDoFn
-      extends DoFn<KV<ShardedKey<DestinationT>, List<String>>, KV<TableDestination, String>> {
+      extends DoFn<
+          KV<ShardedKey<DestinationT>, WritePartition.Result>, KV<TableDestination, Result>> {
+
     private Map<DestinationT, String> jsonSchemas = Maps.newHashMap();
 
     // Represents a pending BigQuery load job.
@@ -123,18 +161,21 @@
       final List<String> partitionFiles;
       final TableDestination tableDestination;
       final TableReference tableReference;
+      final boolean isFirstPane;
 
       public PendingJobData(
           BoundedWindow window,
           BigQueryHelpers.PendingJob retryJob,
           List<String> partitionFiles,
           TableDestination tableDestination,
-          TableReference tableReference) {
+          TableReference tableReference,
+          boolean isFirstPane) {
         this.window = window;
         this.retryJob = retryJob;
         this.partitionFiles = partitionFiles;
         this.tableDestination = tableDestination;
         this.tableReference = tableReference;
+        this.isFirstPane = isFirstPane;
       }
     }
     // All pending load jobs.
@@ -149,7 +190,11 @@
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+    public void processElement(
+        @Element KV<ShardedKey<DestinationT>, WritePartition.Result> element,
+        ProcessContext c,
+        BoundedWindow window)
+        throws Exception {
       dynamicDestinations.setSideInputAccessorFromProcessContext(c);
       DestinationT destination = c.element().getKey().getKey();
       TableSchema tableSchema;
@@ -199,8 +244,8 @@
         tableDestination = tableDestination.withTableReference(tableReference);
       }
 
-      Integer partition = c.element().getKey().getShardNumber();
-      List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
+      Integer partition = element.getKey().getShardNumber();
+      List<String> partitionFiles = Lists.newArrayList(element.getValue().getFilenames());
       String jobIdPrefix =
           BigQueryResourceNaming.createJobIdWithDestination(
               c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex());
@@ -212,7 +257,7 @@
 
       WriteDisposition writeDisposition = firstPaneWriteDisposition;
       CreateDisposition createDisposition = firstPaneCreateDisposition;
-      if (c.pane().getIndex() > 0 && !tempTable) {
+      if (!element.getValue().isFirstPane() && !tempTable) {
         // If writing directly to the destination, then the table is created on the first write
         // and we should change the disposition for subsequent writes.
         writeDisposition = WriteDisposition.WRITE_APPEND;
@@ -238,7 +283,13 @@
               createDisposition,
               schemaUpdateOptions);
       pendingJobs.add(
-          new PendingJobData(window, retryJob, partitionFiles, tableDestination, tableReference));
+          new PendingJobData(
+              window,
+              retryJob,
+              partitionFiles,
+              tableDestination,
+              tableReference,
+              element.getValue().isFirstPane()));
     }
 
     @Teardown
@@ -284,7 +335,7 @@
           bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
 
       PendingJobManager jobManager = new PendingJobManager();
-      for (PendingJobData pendingJob : pendingJobs) {
+      for (final PendingJobData pendingJob : pendingJobs) {
         jobManager =
             jobManager.addPendingJob(
                 pendingJob.retryJob,
@@ -299,11 +350,14 @@
                                   BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
                           pendingJob.tableDestination.getTableDescription());
                     }
+
+                    Result result =
+                        new AutoValue_WriteTables_Result(
+                            BigQueryHelpers.toJsonString(pendingJob.tableReference),
+                            pendingJob.isFirstPane);
                     c.output(
                         mainOutputTag,
-                        KV.of(
-                            pendingJob.tableDestination,
-                            BigQueryHelpers.toJsonString(pendingJob.tableReference)),
+                        KV.of(pendingJob.tableDestination, result),
                         pendingJob.window.maxTimestamp(),
                         pendingJob.window);
                     for (String file : pendingJob.partitionFiles) {
@@ -365,8 +419,8 @@
   }
 
   @Override
-  public PCollection<KV<TableDestination, String>> expand(
-      PCollection<KV<ShardedKey<DestinationT>, List<String>>> input) {
+  public PCollection<KV<TableDestination, Result>> expand(
+      PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input) {
     PCollectionTuple writeTablesOutputs =
         input.apply(
             ParDo.of(new WriteTablesDoFn())
@@ -391,7 +445,6 @@
         .apply(GroupByKey.create())
         .apply(Values.create())
         .apply(ParDo.of(new GarbageCollectTemporaryFiles()));
-
     return writeTablesOutputs.get(mainOutputTag);
   }
 
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index bde803d..6799c67 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -65,6 +65,7 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.generic.GenericRecord;
@@ -72,13 +73,17 @@
 import org.apache.avro.io.Encoder;
 import org.apache.beam.sdk.coders.AtomicCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
+import org.apache.beam.sdk.io.gcp.bigquery.WritePartition.ResultCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result;
 import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
 import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
 import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
@@ -1758,10 +1763,12 @@
       }
     }
 
-    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag =
-        new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {};
-    TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag =
-        new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {};
+    TupleTag<KV<ShardedKey<TableDestination>, WritePartition.Result>> multiPartitionsTag =
+        new TupleTag<KV<ShardedKey<TableDestination>, WritePartition.Result>>(
+            "multiPartitionsTag") {};
+    TupleTag<KV<ShardedKey<TableDestination>, WritePartition.Result>> singlePartitionTag =
+        new TupleTag<KV<ShardedKey<TableDestination>, WritePartition.Result>>(
+            "singlePartitionTag") {};
 
     String tempFilePrefix = testFolder.newFolder("BigQueryIOTest").getAbsolutePath();
     PCollectionView<String> tempFilePrefixView =
@@ -1781,12 +1788,12 @@
 
     DoFnTester<
             Iterable<WriteBundlesToFiles.Result<TableDestination>>,
-            KV<ShardedKey<TableDestination>, List<String>>>
+            KV<ShardedKey<TableDestination>, WritePartition.Result>>
         tester = DoFnTester.of(writePartition);
     tester.setSideInput(tempFilePrefixView, GlobalWindow.INSTANCE, tempFilePrefix);
     tester.processElement(files);
 
-    List<KV<ShardedKey<TableDestination>, List<String>>> partitions;
+    List<KV<ShardedKey<TableDestination>, WritePartition.Result>> partitions;
     if (expectedNumPartitionsPerTable > 1) {
       partitions = tester.takeOutputElements(multiPartitionsTag);
     } else {
@@ -1795,12 +1802,12 @@
 
     List<ShardedKey<TableDestination>> partitionsResult = Lists.newArrayList();
     Map<String, List<String>> filesPerTableResult = Maps.newHashMap();
-    for (KV<ShardedKey<TableDestination>, List<String>> partition : partitions) {
+    for (KV<ShardedKey<TableDestination>, WritePartition.Result> partition : partitions) {
       String table = partition.getKey().getKey().getTableSpec();
       partitionsResult.add(partition.getKey());
       List<String> tableFilesResult =
           filesPerTableResult.computeIfAbsent(table, k -> Lists.newArrayList());
-      tableFilesResult.addAll(partition.getValue());
+      tableFilesResult.addAll(partition.getValue().getFilenames());
     }
 
     assertThat(
@@ -1847,7 +1854,7 @@
     String jobIdToken = "jobId";
     final Multimap<TableDestination, String> expectedTempTables = ArrayListMultimap.create();
 
-    List<KV<ShardedKey<String>, List<String>>> partitions = Lists.newArrayList();
+    List<KV<ShardedKey<String>, WritePartition.Result>> partitions = Lists.newArrayList();
     for (int i = 0; i < numTables; ++i) {
       String tableName = String.format("project-id:dataset-id.table%05d", i);
       TableDestination tableDestination = new TableDestination(tableName, tableName);
@@ -1869,7 +1876,10 @@
           }
           filesPerPartition.add(writer.getResult().resourceId.toString());
         }
-        partitions.add(KV.of(ShardedKey.of(tableDestination.getTableSpec(), j), filesPerPartition));
+        partitions.add(
+            KV.of(
+                ShardedKey.of(tableDestination.getTableSpec(), j),
+                new AutoValue_WritePartition_Result(filesPerPartition, true)));
 
         String json =
             String.format(
@@ -1879,8 +1889,11 @@
       }
     }
 
-    PCollection<KV<ShardedKey<String>, List<String>>> writeTablesInput =
-        p.apply(Create.of(partitions));
+    PCollection<KV<ShardedKey<String>, WritePartition.Result>> writeTablesInput =
+        p.apply(
+            Create.of(partitions)
+                .withCoder(
+                    KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), ResultCoder.INSTANCE)));
     PCollectionView<String> jobIdTokenView =
         p.apply("CreateJobId", Create.of("jobId")).apply(View.asSingleton());
     List<PCollectionView<?>> sideInputs = ImmutableList.of(jobIdTokenView);
@@ -1903,18 +1916,25 @@
             false,
             Collections.emptySet());
 
-    PCollection<KV<TableDestination, String>> writeTablesOutput =
-        writeTablesInput.apply(writeTables);
+    PCollection<KV<TableDestination, WriteTables.Result>> writeTablesOutput =
+        writeTablesInput
+            .apply(writeTables)
+            .setCoder(KvCoder.of(TableDestinationCoderV3.of(), WriteTables.ResultCoder.INSTANCE));
 
     PAssert.thatMultimap(writeTablesOutput)
         .satisfies(
             input -> {
               assertEquals(input.keySet(), expectedTempTables.keySet());
-              for (Map.Entry<TableDestination, Iterable<String>> entry : input.entrySet()) {
+              for (Map.Entry<TableDestination, Iterable<WriteTables.Result>> entry :
+                  input.entrySet()) {
+                Iterable<String> tableNames =
+                    StreamSupport.stream(entry.getValue().spliterator(), false)
+                        .map(Result::getTableName)
+                        .collect(Collectors.toList());
                 @SuppressWarnings("unchecked")
                 String[] expectedValues =
                     Iterables.toArray(expectedTempTables.get(entry.getKey()), String.class);
-                assertThat(entry.getValue(), containsInAnyOrder(expectedValues));
+                assertThat(tableNames, containsInAnyOrder(expectedValues));
               }
               return null;
             });
@@ -1951,7 +1971,7 @@
     Multimap<TableDestination, TableRow> expectedRowsPerTable = ArrayListMultimap.create();
     String jobIdToken = "jobIdToken";
     Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();
-    List<KV<TableDestination, String>> tempTablesElement = Lists.newArrayList();
+    List<KV<TableDestination, WriteTables.Result>> tempTablesElement = Lists.newArrayList();
     for (int i = 0; i < numFinalTables; ++i) {
       String tableName = "project-id:dataset-id.table_" + i;
       TableDestination tableDestination = new TableDestination(tableName, "table_" + i + "_desc");
@@ -1971,7 +1991,8 @@
         expectedRowsPerTable.putAll(tableDestination, rows);
         String tableJson = toJsonString(tempTable);
         tempTables.put(tableDestination, tableJson);
-        tempTablesElement.add(KV.of(tableDestination, tableJson));
+        tempTablesElement.add(
+            KV.of(tableDestination, new AutoValue_WriteTables_Result(tableJson, true)));
       }
     }
 
@@ -1987,7 +2008,8 @@
             3,
             "kms_key");
 
-    DoFnTester<Iterable<KV<TableDestination, String>>, Void> tester = DoFnTester.of(writeRename);
+    DoFnTester<Iterable<KV<TableDestination, WriteTables.Result>>, Void> tester =
+        DoFnTester.of(writeRename);
     tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
     tester.processElement(tempTablesElement);
     tester.finishBundle();
diff --git a/sdks/python/MANIFEST.in b/sdks/python/MANIFEST.in
index c97e57a..60b0989 100644
--- a/sdks/python/MANIFEST.in
+++ b/sdks/python/MANIFEST.in
@@ -19,3 +19,4 @@
 include README.md
 include NOTICE
 include LICENSE
+include LICENSE.python
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 4d1d892..2048742 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -21,7 +21,7 @@
 
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.examples.complete import autocomplete
@@ -55,7 +55,7 @@
               ('that', ((1, 'that'), )),
           ]))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_autocomplete_it(self):
     with TestPipeline(is_integration_test=True) as p:
       words = p | beam.io.ReadFromText(self.KINGLEAR_INPUT)
diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
index 91388a4..ea7ec28 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
@@ -20,7 +20,7 @@
 Code: beam/sdks/python/apache_beam/examples/complete/game/game_stats.py
 Usage:
 
-  python setup.py nosetests --test-pipeline-options=" \
+  pytest --test-pipeline-options=" \
       --runner=TestDataflowRunner \
       --project=... \
       --region=... \
@@ -38,8 +38,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.complete.game import game_stats
 from apache_beam.io.gcp.tests import utils
@@ -103,7 +103,7 @@
     test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
     test_utils.cleanup_topics(self.pub_client, [self.input_topic])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_game_stats_it(self):
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
 
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
index f21abac..a9729bc 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
@@ -20,7 +20,7 @@
 Code: beam/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
 Usage:
 
-  python setup.py nosetests --test-pipeline-options=" \
+  pytest --test-pipeline-options=" \
       --runner=TestDataflowRunner \
       --project=... \
       --region=... \
@@ -36,8 +36,8 @@
 import logging
 import unittest
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.complete.game import hourly_team_score
 from apache_beam.io.gcp.tests import utils
@@ -63,7 +63,7 @@
     self.dataset_ref = utils.create_bq_dataset(
         self.project, self.OUTPUT_DATASET)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_hourly_team_score_it(self):
     state_verifier = PipelineStateMatcher(PipelineState.DONE)
     query = (
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
index 8f5f91c..8b82c64 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
@@ -20,7 +20,7 @@
 Code: beam/sdks/python/apache_beam/examples/complete/game/leader_board.py
 Usage:
 
-  python setup.py nosetests --test-pipeline-options=" \
+    pytest --test-pipeline-options=" \
       --runner=TestDataflowRunner \
       --project=... \
       --region=... \
@@ -38,8 +38,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.complete.game import leader_board
 from apache_beam.io.gcp.tests import utils
@@ -104,7 +104,7 @@
     test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
     test_utils.cleanup_topics(self.pub_client, [self.input_topic])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_leader_board_it(self):
     state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
 
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py
index a2b3a17..d26565e 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py
@@ -20,7 +20,7 @@
 Code: beam/sdks/python/apache_beam/examples/complete/game/user_score.py
 Usage:
 
-  python setup.py nosetests --test-pipeline-options=" \
+    pytest --test-pipeline-options=" \
       --runner=TestDataflowRunner \
       --project=... \
       --region=... \
@@ -37,8 +37,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.complete.game import user_score
 from apache_beam.runners.runner import PipelineState
@@ -60,7 +60,7 @@
     self.output = '/'.join(
         [self.test_pipeline.get_option('output'), self.uuid, 'results'])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_user_score_it(self):
 
     state_verifier = PipelineStateMatcher(PipelineState.DONE)
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py
index 9b6097b..a2a3262 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py
@@ -24,8 +24,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.complete.juliaset.juliaset import juliaset
 from apache_beam.io.filesystems import FileSystems
@@ -34,7 +34,7 @@
 from apache_beam.testing.test_pipeline import TestPipeline
 
 
-@attr('IT')
+@pytest.mark.it_postcommit
 class JuliaSetTestIT(unittest.TestCase):
   GRID_SIZE = 1000
 
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
index cfec86b..fa5f12c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
@@ -23,8 +23,8 @@
 import time
 import unittest
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples.cookbook import bigquery_tornadoes
 from apache_beam.io.gcp.tests import utils
@@ -42,7 +42,7 @@
   # from expected Bigquery table.
   DEFAULT_CHECKSUM = 'd860e636050c559a16a791aff40d6ad809d4daf0'
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_bigquery_tornadoes_it(self):
     test_pipeline = TestPipeline(is_integration_test=True)
 
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py
index 36f81bd..388caf6 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py
@@ -23,8 +23,8 @@
 import time
 import unittest
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
 from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
@@ -42,7 +42,7 @@
   DATASTORE_WORDCOUNT_KIND = "DatastoreWordCount"
   EXPECTED_CHECKSUM = '826f69ed0275858c2e098f1e8407d4e3ba5a4b3f'
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_datastore_wordcount_it(self):
     test_pipeline = TestPipeline(is_integration_test=True)
     kind = self.DATASTORE_WORDCOUNT_KIND
diff --git a/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py b/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py
index e1a0ac4..6ed376e 100644
--- a/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py
+++ b/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py
@@ -28,7 +28,7 @@
 import uuid
 
 import pandas as pd
-from nose.plugins.attrib import attr
+import pytest
 
 from apache_beam.examples.dataframe import flight_delays
 from apache_beam.io.filesystems import FileSystems
@@ -100,7 +100,7 @@
   def tearDown(self):
     FileSystems.delete([self.outdir + '/'])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_flight_delays(self):
     flight_delays.run_flight_delay_pipeline(
         self.test_pipeline,
diff --git a/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py b/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py
index 4fedfa8..f81b7d8 100644
--- a/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py
+++ b/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py
@@ -25,7 +25,7 @@
 import uuid
 
 import pandas as pd
-from nose.plugins.attrib import attr
+import pytest
 
 from apache_beam.examples.dataframe import taxiride
 from apache_beam.io.filesystems import FileSystems
@@ -44,7 +44,7 @@
   def tearDown(self):
     FileSystems.delete([self.outdir + '/'])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_aggregation(self):
     taxiride.run_aggregation_pipeline(
         self.test_pipeline,
@@ -71,7 +71,7 @@
 
     pd.testing.assert_frame_equal(expected, result)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_enrich(self):
     # Standard workers OOM with the enrich pipeline
     self.test_pipeline.get_pipeline_options().view_as(
diff --git a/sdks/python/apache_beam/examples/fastavro_it_test.py b/sdks/python/apache_beam/examples/fastavro_it_test.py
index 9ac8051..c9bb988 100644
--- a/sdks/python/apache_beam/examples/fastavro_it_test.py
+++ b/sdks/python/apache_beam/examples/fastavro_it_test.py
@@ -24,7 +24,7 @@
 Usage:
 
   DataFlowRunner:
-    python setup.py nosetests --tests apache_beam.examples.fastavro_it_test \
+    pytest apache_beam/examples/fastavro_it_test.py \
         --test-pipeline-options="
           --runner=TestDataflowRunner
           --project=...
@@ -36,7 +36,7 @@
         "
 
   DirectRunner:
-    python setup.py nosetests --tests apache_beam.examples.fastavro_it_test \
+    pytest apache_beam/examples/fastavro_it_test.py \
       --test-pipeline-options="
         --output=/tmp
         --records=5000
@@ -50,9 +50,9 @@
 import unittest
 import uuid
 
+import pytest
 from avro.schema import Parse
 from fastavro import parse_schema
-from nose.plugins.attrib import attr
 
 from apache_beam.io.avroio import ReadAllFromAvro
 from apache_beam.io.avroio import WriteToAvro
@@ -98,7 +98,7 @@
     self.uuid = str(uuid.uuid4())
     self.output = '/'.join([self.test_pipeline.get_option('output'), self.uuid])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_avro_it(self):
     num_records = self.test_pipeline.get_option('records')
     num_records = int(num_records) if num_records else 1000000
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py
index 367fc82..9101ff8 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py
@@ -23,8 +23,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples import streaming_wordcount_debugging
 from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
@@ -94,7 +94,7 @@
     test_utils.cleanup_topics(
         self.pub_client, [self.input_topic, self.output_topic])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @unittest.skip(
       "Skipped due to [BEAM-3377]: assert_that not working for streaming")
   def test_streaming_wordcount_debugging_it(self):
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index 7812283..8beae5e 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -23,8 +23,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples import streaming_wordcount
 from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
@@ -77,7 +77,7 @@
     test_utils.cleanup_topics(
         self.pub_client, [self.input_topic, self.output_topic])
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_streaming_wordcount_it(self):
     # Build expected dataset.
     expected_msg = [('%d: 1' % num).encode('utf-8')
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 242dcff..8ee49c7 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -26,7 +26,6 @@
 
 import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.examples import wordcount
 from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions
@@ -39,19 +38,16 @@
 
 class WordCountIT(unittest.TestCase):
 
-  # Enable nose tests running in parallel
-  _multiprocess_can_split_ = True
-
   # The default checksum is a SHA-1 hash generated from a sorted list of
   # lines read from expected output. This value corresponds to the default
   # input of WordCount example.
   DEFAULT_CHECKSUM = '33535a832b7db6d78389759577d4ff495980b9c0'
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_wordcount_it(self):
     self._run_wordcount_it(wordcount.run)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @pytest.mark.it_validatescontainer
   def test_wordcount_fnapi_it(self):
     self._run_wordcount_it(wordcount.run, experiment='beam_fn_api')
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index a39584a..90086c9 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -28,8 +28,8 @@
 import uuid
 import warnings
 
+import pytest
 from hamcrest.library.text import stringmatches
-from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.io import fileio
@@ -291,7 +291,7 @@
   def setUp(self):
     self.test_pipeline = TestPipeline(is_integration_test=True)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_transform_on_gcs(self):
     args = self.test_pipeline.get_full_options_as_args()
 
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 0a30462..699dfa4 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -28,8 +28,8 @@
 import time
 import unittest
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.io.gcp import big_query_query_to_table_pipeline
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
@@ -154,7 +154,7 @@
         self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE, table_data)
     self.assertTrue(passed, 'Error in BQ setup: %s' % errors)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_legacy_sql(self):
     verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table
     expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED)
@@ -177,7 +177,7 @@
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_standard_sql(self):
     verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table
     expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED)
@@ -200,7 +200,7 @@
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_standard_sql_kms_key_native(self):
     if isinstance(self.test_pipeline.runner, TestDirectRunner):
       self.skipTest("This test doesn't work on DirectRunner.")
@@ -236,7 +236,7 @@
         'No encryption configuration found: %s' % table)
     self.assertEqual(kms_key, table.encryptionConfiguration.kmsKeyName)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_new_types(self):
     expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
     verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
@@ -260,7 +260,7 @@
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_new_types_avro(self):
     expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
     verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
@@ -283,7 +283,7 @@
     options = self.test_pipeline.get_full_options_as_args(**extra_opts)
     big_query_query_to_table_pipeline.run_bq_pipeline(options)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_new_types_native(self):
     expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
     verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 9672431..1e227f8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -26,10 +26,10 @@
 import unittest
 
 import mock
+import pytest
 from hamcrest.core import assert_that as hamcrest_assert
 from hamcrest.core.core.allof import all_of
 from hamcrest.core.core.is_ import is_
-from nose.plugins.attrib import attr
 from parameterized import param
 from parameterized import parameterized
 
@@ -744,7 +744,7 @@
     _LOGGER.info(
         "Created dataset %s in project %s", self.dataset_id, self.project)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_multiple_destinations_transform(self):
     output_table_1 = '%s%s' % (self.output_table, 1)
     output_table_2 = '%s%s' % (self.output_table, 2)
@@ -824,7 +824,7 @@
               max_file_size=20,
               max_files_per_bundle=-1))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_bqfl_streaming(self):
     if isinstance(self.test_pipeline.runner, TestDataflowRunner):
       self.skipTest("TestStream is not supported on TestDataflowRunner")
@@ -862,7 +862,7 @@
                                         .Method.FILE_LOADS,
                                       triggering_frequency=100))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_one_job_fails_all_jobs_fail(self):
 
     # If one of the import jobs fails, then other jobs must not be performed.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py
index 6652b4e..63cc445 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py
@@ -25,8 +25,8 @@
 import logging
 import unittest
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.io.gcp import bigquery_io_read_pipeline
 from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
@@ -59,11 +59,11 @@
     bigquery_io_read_pipeline.run(
         test_pipeline.get_full_options_as_args(**extra_opts))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_bigquery_read_custom_1M_python(self):
     self.run_bigquery_io_read_pipeline('1M', True)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_bigquery_read_1M_python(self):
     self.run_bigquery_io_read_pipeline('1M')
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 4040a60..472b521 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -29,7 +29,7 @@
 from decimal import Decimal
 from functools import wraps
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
@@ -157,7 +157,7 @@
         cls.project, cls.dataset_id, table_name, cls.TABLE_DATA)
 
   @skip(['PortableRunner', 'FlinkRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_native_source(self):
     with beam.Pipeline(argv=self.args) as p:
       result = (
@@ -165,7 +165,7 @@
               beam.io.BigQuerySource(query=self.query, use_standard_sql=True)))
       assert_that(result, equal_to(self.TABLE_DATA))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_iobase_source(self):
     query = StaticValueProvider(str, self.query)
     with beam.Pipeline(argv=self.args) as p:
@@ -272,7 +272,7 @@
     return expected_data
 
   @skip(['PortableRunner', 'FlinkRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_native_source(self):
     with beam.Pipeline(argv=self.args) as p:
       result = (
@@ -281,7 +281,7 @@
               beam.io.BigQuerySource(query=self.query, use_standard_sql=True)))
       assert_that(result, equal_to(self.get_expected_data()))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_iobase_source(self):
     with beam.Pipeline(argv=self.args) as p:
       result = (
@@ -378,7 +378,7 @@
     return table_schema
 
   @skip(['PortableRunner', 'FlinkRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_read_queries(self):
     # TODO(BEAM-11311): Remove experiment when tests run on r_v2.
     args = self.args + ["--experiments=use_runner_v2"]
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 19550b5..ce5874c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -32,8 +32,8 @@
 
 import hamcrest as hc
 import mock
+import pytest
 import pytz
-from nose.plugins.attrib import attr
 from parameterized import param
 from parameterized import parameterized
 
@@ -1047,13 +1047,6 @@
 class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
   BIG_QUERY_DATASET_ID = 'python_bq_streaming_inserts_'
 
-  # Prevent nose from finding and running tests that were not
-  # specified in the Gradle file.
-  # See "More tests may be found" in:
-  # https://nose.readthedocs.io/en/latest/doc_tests/test_multiprocess
-  # /multiprocess.html#other-differences-in-test-running
-  _multiprocess_can_split_ = True
-
   def setUp(self):
     self.test_pipeline = TestPipeline(is_integration_test=True)
     self.runner_name = type(self.test_pipeline.runner).__name__
@@ -1069,7 +1062,7 @@
     _LOGGER.info(
         "Created dataset %s in project %s", self.dataset_id, self.project)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_value_provider_transform(self):
     output_table_1 = '%s%s' % (self.output_table, 1)
     output_table_2 = '%s%s' % (self.output_table, 2)
@@ -1139,7 +1132,7 @@
               additional_bq_parameters=lambda _: additional_bq_parameters,
               method='FILE_LOADS'))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_multiple_destinations_transform(self):
     streaming = self.test_pipeline.options.view_as(StandardOptions).streaming
     if streaming and isinstance(self.test_pipeline.runner, TestDataflowRunner):
@@ -1333,11 +1326,11 @@
           method=method,
           triggering_frequency=triggering_frequency)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_streaming_inserts(self):
     self._run_pubsub_bq_pipeline(WriteToBigQuery.Method.STREAMING_INSERTS)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_file_loads(self):
     self._run_pubsub_bq_pipeline(
         WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20)
@@ -1362,7 +1355,7 @@
     _LOGGER.info(
         'Created dataset %s in project %s', self.dataset_id, self.project)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_avro_file_load(self):
     # Construct elements such that they can be written via Avro but not via
     # JSON. See BEAM-8841.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index c8285a1..c7f1d442 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -30,8 +30,8 @@
 
 import hamcrest as hc
 import mock
+import pytest
 import pytz
-from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
@@ -105,7 +105,7 @@
         projectId=self.project, datasetId=self.dataset_id, table=table)
     self.bigquery_client.client.tables.Insert(request)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_write(self):
     table_name = 'python_write_table'
     table_id = '{}.{}'.format(self.dataset_id, table_name)
@@ -164,7 +164,7 @@
               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
               write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_write_schema_autodetect(self):
     if self.runner_name == 'TestDataflowRunner':
       self.skipTest('DataflowRunner does not support schema autodetection')
@@ -209,7 +209,7 @@
               write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
               temp_file_format=FileFormat.JSON))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_write_new_types(self):
     table_name = 'python_new_types_table'
     table_id = '{}.{}'.format(self.dataset_id, table_name)
@@ -290,7 +290,7 @@
               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
               write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_big_query_write_without_schema(self):
     table_name = 'python_no_schema_table'
     self.create_table(table_name)
@@ -352,7 +352,7 @@
               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
               temp_file_format=FileFormat.JSON))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @mock.patch(
       "apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1)
   def test_big_query_write_temp_table_append_schema_update(self):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py
index f5b68a6..abecd5b 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py
@@ -32,8 +32,8 @@
 import unittest
 from datetime import datetime
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -66,7 +66,7 @@
     datastore_write_it_pipeline.run(
         test_pipeline.get_full_options_as_args(**extra_opts))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @unittest.skipIf(
       datastore_write_it_pipeline is None, 'GCP dependencies are not installed')
   def test_datastore_write_limit(self):
diff --git a/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py b/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
index 16e976c..7970dd5 100644
--- a/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
@@ -29,7 +29,7 @@
 import string
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.io import fileio
@@ -133,7 +133,7 @@
     # clean up the temp Dicom store
     delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_dicom_search_instances(self):
     # Search and compare the metadata of a persistent DICOM store.
     # Both refine and comprehensive search will be tested.
@@ -183,7 +183,7 @@
           equal_to([expected_dict_refine]),
           label='refine search assert')
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_dicom_store_instance_from_gcs(self):
     # Store DICOM files to a empty DICOM store from a GCS bucket,
     # then check if the store metadata match.
diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
index d87e7be..0272dac 100644
--- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
@@ -21,7 +21,7 @@
 import unittest
 import uuid
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -101,7 +101,7 @@
     cls._add_dummy_entries()
     _LOGGER.info("Spanner Read IT Setup Complete...")
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_read_via_table(self):
     _LOGGER.info("Spanner Read via table")
     with beam.Pipeline(argv=self.args) as p:
@@ -113,7 +113,7 @@
           columns=["UserId", "Key"])
     assert_that(r, equal_to(self._data))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_read_via_sql(self):
     _LOGGER.info("Running Spanner via sql")
     with beam.Pipeline(argv=self.args) as p:
diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
index 1701cbb..7f2c8e3 100644
--- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
@@ -20,7 +20,7 @@
 import unittest
 import uuid
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -103,7 +103,7 @@
     cls._create_database()
     _LOGGER.info('Spanner Write IT Setup Complete...')
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_write_batches(self):
     _prefex = 'test_write_batches'
     mutations = [
@@ -129,7 +129,7 @@
     res.wait_until_finish()
     self.assertEqual(self._count_data(_prefex), len(mutations))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_spanner_update(self):
     _prefex = 'test_update'
 
@@ -165,7 +165,7 @@
     res.wait_until_finish()
     self.assertEqual(self._count_data(_prefex), 2)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_spanner_error(self):
     mutations_update = [
         WriteMutation.update(
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
index 5aedb6c..b06e374 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
@@ -43,7 +43,7 @@
 import unittest
 import uuid
 
-from nose.plugins.attrib import attr
+import pytest
 
 from apache_beam.io.filesystems import FileSystems
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -111,17 +111,17 @@
     self.gcsio.copy(src, dst, kms_key_name, **extra_kwargs)
     self._verify_copy(src, dst, kms_key_name)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_copy(self):
     self._test_copy("test_copy")
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_copy_kms(self):
     if self.kms_key_name is None:
       raise unittest.SkipTest('--kms_key_name not specified')
     self._test_copy("test_copy_kms", self.kms_key_name)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @unittest.skip('BEAM-12352: enable once maxBytesRewrittenPerCall works again')
   def test_copy_rewrite_token(self):
     # Tests a multi-part copy (rewrite) operation. This is triggered by a
@@ -165,17 +165,17 @@
     for _src, _dst in src_dst_pairs:
       self._verify_copy(_src, _dst, kms_key_name)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_copy_batch(self):
     self._test_copy_batch("test_copy_batch")
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_copy_batch_kms(self):
     if self.kms_key_name is None:
       raise unittest.SkipTest('--kms_key_name not specified')
     self._test_copy_batch("test_copy_batch_kms", self.kms_key_name)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @unittest.skip('BEAM-12352: enable once maxBytesRewrittenPerCall works again')
   def test_copy_batch_rewrite_token(self):
     # Tests a multi-part copy (rewrite) operation. This is triggered by a
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 7e20be3..541bb52 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -24,8 +24,8 @@
 import unittest
 import uuid
 
+import pytest
 from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
 
 from apache_beam.io.gcp import pubsub_it_pipeline
 from apache_beam.io.gcp.pubsub import PubsubMessage
@@ -204,11 +204,11 @@
         id_label=self.ID_LABEL,
         timestamp_attribute=self.TIMESTAMP_ATTRIBUTE)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_streaming_data_only(self):
     self._test_streaming(with_attributes=False)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_streaming_with_attributes(self):
     self._test_streaming(with_attributes=True)
 
diff --git a/sdks/python/apache_beam/io/parquetio_it_test.py b/sdks/python/apache_beam/io/parquetio_it_test.py
index ee4e0f0..0d3cd0d 100644
--- a/sdks/python/apache_beam/io/parquetio_it_test.py
+++ b/sdks/python/apache_beam/io/parquetio_it_test.py
@@ -21,7 +21,7 @@
 import unittest
 from collections import Counter
 
-from nose.plugins.attrib import attr
+import pytest
 
 from apache_beam import Create
 from apache_beam import DoFn
@@ -52,7 +52,7 @@
   def tearDown(self):
     pass
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_parquetio_it(self):
     file_prefix = "parquet_it_test"
     init_size = 10
diff --git a/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py b/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py
index 4ada679..a699aaa 100644
--- a/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py
+++ b/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py
@@ -20,7 +20,7 @@
 import logging
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -66,7 +66,7 @@
     self.runner_name = type(self.test_pipeline.runner).__name__
     self.project = self.test_pipeline.get_option('project')
 
-  @attr("IT")
+  @pytest.mark.it_postcommit
   def test_deidentification(self):
     with TestPipeline(is_integration_test=True) as p:
       output = (
@@ -77,7 +77,7 @@
               inspection_config=INSPECT_CONFIG))
       assert_that(output, equal_to(['####################']))
 
-  @attr("IT")
+  @pytest.mark.it_postcommit
   def test_inspection(self):
     with TestPipeline(is_integration_test=True) as p:
       output = (
diff --git a/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py b/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py
index 932bc685..9adf56a 100644
--- a/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py
@@ -18,7 +18,7 @@
 
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -46,7 +46,7 @@
       ])
 
 
-@attr('IT')
+@pytest.mark.it_postcommit
 @unittest.skipIf(AnnotateText is None, 'GCP dependencies are not installed')
 class NaturalLanguageMlTestIT(unittest.TestCase):
   def test_analyzing_syntax(self):
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
index 8c94a29..535a29f 100644
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
@@ -22,7 +22,7 @@
 import random
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -53,7 +53,7 @@
   yield response[0]["results"]
 
 
-@attr('IT')
+@pytest.mark.it_postcommit
 @unittest.skipIf(
     recommendationengine is None,
     "Recommendations AI dependencies not installed.")
diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py
index d934520..03f79d1 100644
--- a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py
@@ -22,7 +22,7 @@
 import unittest
 
 import hamcrest as hc
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -45,7 +45,7 @@
       yield segment.entity.description
 
 
-@attr('IT')
+@pytest.mark.it_postcommit
 @unittest.skipIf(
     AnnotateVideoWithContext is None, 'GCP dependencies are not installed')
 class VideoIntelligenceMlTestIT(unittest.TestCase):
diff --git a/sdks/python/apache_beam/ml/gcp/visionml_test_it.py b/sdks/python/apache_beam/ml/gcp/visionml_test_it.py
index 14af3cb8..4413266 100644
--- a/sdks/python/apache_beam/ml/gcp/visionml_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/visionml_test_it.py
@@ -18,7 +18,7 @@
 
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.testing.test_pipeline import TestPipeline
@@ -40,7 +40,7 @@
       yield text_annotation.description
 
 
-@attr('IT')
+@pytest.mark.it_postcommit
 @unittest.skipIf(vision is None, 'GCP dependencies are not installed')
 class VisionMlTestIT(unittest.TestCase):
   def test_text_detection_with_language_hint(self):
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 69362a4..d0aaa2f 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -61,6 +61,7 @@
 from typing import FrozenSet
 from typing import Iterable
 from typing import List
+from typing import Mapping
 from typing import Optional
 from typing import Sequence
 from typing import Set
@@ -271,7 +272,7 @@
     output_replacements = {
     }  # type: Dict[AppliedPTransform, List[Tuple[pvalue.PValue, Optional[str]]]]
     input_replacements = {
-    }  # type: Dict[AppliedPTransform, Sequence[Union[pvalue.PBegin, pvalue.PCollection]]]
+    }  # type: Dict[AppliedPTransform, Mapping[str, Union[pvalue.PBegin, pvalue.PCollection]]]
     side_input_replacements = {
     }  # type: Dict[AppliedPTransform, List[pvalue.AsSideInput]]
 
@@ -297,7 +298,7 @@
               original_transform_node.parent,
               replacement_transform,
               original_transform_node.full_label,
-              original_transform_node.inputs)
+              original_transform_node.main_inputs)
 
           replacement_transform_node.resource_hints = (
               original_transform_node.resource_hints)
@@ -437,11 +438,11 @@
                 output_replacements[transform_node].append((tag, replacement))
 
         if replace_input:
-          new_input = [
-              input if not input in output_map else output_map[input]
-              for input in transform_node.inputs
-          ]
-          input_replacements[transform_node] = new_input
+          new_inputs = {
+              tag: input if not input in output_map else output_map[input]
+              for (tag, input) in transform_node.main_inputs.items()
+          }
+          input_replacements[transform_node] = new_inputs
 
         if replace_side_inputs:
           new_side_inputs = []
@@ -670,15 +671,18 @@
 
     pvalueish, inputs = transform._extract_input_pvalues(pvalueish)
     try:
-      inputs = tuple(inputs)
-      for leaf_input in inputs:
-        if not isinstance(leaf_input, pvalue.PValue):
-          raise TypeError
+      if not isinstance(inputs, dict):
+        inputs = {str(ix): input for (ix, input) in enumerate(inputs)}
     except TypeError:
       raise NotImplementedError(
           'Unable to extract PValue inputs from %s; either %s does not accept '
           'inputs of this format, or it does not properly override '
           '_extract_input_pvalues' % (pvalueish, transform))
+    for t, leaf_input in inputs.items():
+      if not isinstance(leaf_input, pvalue.PValue) or not isinstance(t, str):
+        raise NotImplementedError(
+            '%s does not properly override _extract_input_pvalues, '
+            'returned %s from %s' % (transform, inputs, pvalueish))
 
     current = AppliedPTransform(
         self._current_transform(), transform, full_label, inputs)
@@ -705,7 +709,8 @@
         if result.producer is None:
           result.producer = current
 
-        self._infer_result_type(transform, inputs, result)
+        # TODO(BEAM-1833): Pass full tuples dict.
+        self._infer_result_type(transform, tuple(inputs.values()), result)
 
         assert isinstance(result.producer.inputs, tuple)
         # The DoOutputsTuple adds the PCollection to the outputs when accessed
@@ -940,7 +945,7 @@
     for id in proto.components.transforms:
       transform = context.transforms.get_by_id(id)
       if not transform.inputs and transform.transform.__class__ in has_pbegin:
-        transform.inputs = (pvalue.PBegin(p), )
+        transform.main_inputs = {'None': pvalue.PBegin(p)}
 
     if return_context:
       return p, context  # type: ignore  # too complicated for now
@@ -1030,7 +1035,7 @@
       parent,  # type:  Optional[AppliedPTransform]
       transform,  # type: Optional[ptransform.PTransform]
       full_label,  # type: str
-      inputs,  # type: Optional[Sequence[Union[pvalue.PBegin, pvalue.PCollection]]]
+      main_inputs,  # type: Optional[Mapping[str, Union[pvalue.PBegin, pvalue.PCollection]]]
       environment_id=None,  # type: Optional[str]
       annotations=None, # type: Optional[Dict[str, bytes]]
   ):
@@ -1043,7 +1048,7 @@
     # reusing PTransform instances in different contexts (apply() calls) without
     # any interference. This is particularly useful for composite transforms.
     self.full_label = full_label
-    self.inputs = inputs or ()
+    self.main_inputs = dict(main_inputs or {})
 
     self.side_inputs = tuple() if transform is None else transform.side_inputs
     self.outputs = {}  # type: Dict[Union[str, int, None], pvalue.PValue]
@@ -1076,6 +1081,10 @@
       }
     self.annotations = annotations
 
+  @property
+  def inputs(self):
+    return tuple(self.main_inputs.values())
+
   def __repr__(self):
     # type: () -> str
     return "%s(%s, %s)" % (
@@ -1109,8 +1118,8 @@
     if isinstance(self.transform, external.ExternalTransform):
       self.transform.replace_named_outputs(self.named_outputs())
 
-  def replace_inputs(self, inputs):
-    self.inputs = inputs
+  def replace_inputs(self, main_inputs):
+    self.main_inputs = main_inputs
 
     # Importing locally to prevent circular dependency issues.
     from apache_beam.transforms import external
@@ -1215,12 +1224,11 @@
 
   def named_inputs(self):
     # type: () -> Dict[str, pvalue.PValue]
-    # TODO(BEAM-1833): Push names up into the sdk construction.
     if self.transform is None:
-      assert not self.inputs and not self.side_inputs
+      assert not self.main_inputs and not self.side_inputs
       return {}
     else:
-      return self.transform._named_inputs(self.inputs, self.side_inputs)
+      return self.transform._named_inputs(self.main_inputs, self.side_inputs)
 
   def named_outputs(self):
     # type: () -> Dict[str, pvalue.PCollection]
@@ -1309,10 +1317,10 @@
       pardo_payload = None
       side_input_tags = []
 
-    main_inputs = [
-        context.pcollections.get_by_id(id) for tag,
-        id in proto.inputs.items() if tag not in side_input_tags
-    ]
+    main_inputs = {
+        tag: context.pcollections.get_by_id(id)
+        for (tag, id) in proto.inputs.items() if tag not in side_input_tags
+    }
 
     transform = ptransform.PTransform.from_runner_api(proto, context)
     if transform and proto.environment_id:
@@ -1334,7 +1342,7 @@
         parent=None,
         transform=transform,
         full_label=proto.unique_name,
-        inputs=main_inputs,
+        main_inputs=main_inputs,
         environment_id=None,
         annotations=proto.annotations)
 
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index c2a23fd..731d5e3 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -972,6 +972,24 @@
     for transform_id in runner_api_proto.components.transforms:
       self.assertRegex(transform_id, r'[a-zA-Z0-9-_]+')
 
+  def test_input_names(self):
+    class MyPTransform(beam.PTransform):
+      def expand(self, pcolls):
+        return pcolls.values() | beam.Flatten()
+
+    p = beam.Pipeline()
+    input_names = set('ABC')
+    inputs = {x: p | x >> beam.Create([x]) for x in input_names}
+    inputs | MyPTransform()  # pylint: disable=expression-not-assigned
+    runner_api_proto = Pipeline.to_runner_api(p)
+
+    for transform_proto in runner_api_proto.components.transforms.values():
+      if transform_proto.unique_name == 'MyPTransform':
+        self.assertEqual(set(transform_proto.inputs.keys()), input_names)
+        break
+    else:
+      self.fail('Unable to find transform.')
+
   def test_display_data(self):
     class MyParentTransform(beam.PTransform):
       def expand(self, p):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
index 0ee0e0f..934dfe5 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
@@ -23,7 +23,6 @@
 import unittest
 
 import pytest
-from nose.plugins.attrib import attr
 
 import apache_beam as beam
 from apache_beam.options.pipeline_options import PipelineOptions
@@ -43,7 +42,7 @@
     p = beam.Pipeline(options=pipeline_options)
     return dataflow_exercise_metrics_pipeline.apply_and_run(p)
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_metrics_it(self):
     result = self.run_pipeline()
     errors = metric_result_matchers.verify_all(
@@ -51,7 +50,7 @@
         dataflow_exercise_metrics_pipeline.metric_matchers())
     self.assertFalse(errors, str(errors))
 
-  @attr('IT')
+  @pytest.mark.it_postcommit
   @pytest.mark.it_validatescontainer
   def test_metrics_fnapi_it(self):
     result = self.run_pipeline(experiment='beam_fn_api')
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 0ac8c5f..bbaf52c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -305,7 +305,7 @@
                     parent,
                     beam.Map(lambda x: (b'', x)),
                     transform_node.full_label + '/MapToVoidKey%s' % ix,
-                    (side_input.pvalue, ))
+                    {'input': side_input.pvalue})
                 new_side_input.pvalue.producer = map_to_void_key
                 map_to_void_key.add_output(new_side_input.pvalue, None)
                 parent.add_part(map_to_void_key)
@@ -594,15 +594,9 @@
     return result
 
   def _maybe_add_unified_worker_missing_options(self, options):
-    debug_options = options.view_as(DebugOptions)
-    # Streaming is always portable, default to runner v2.
-    if options.view_as(StandardOptions).streaming:
-      if not debug_options.lookup_experiment('disable_runner_v2'):
-        debug_options.add_experiment('beam_fn_api')
-        debug_options.add_experiment('use_runner_v2')
-        debug_options.add_experiment('use_portable_job_submission')
     # set default beam_fn_api experiment if use unified
     # worker experiment flag exists, no-op otherwise.
+    debug_options = options.view_as(DebugOptions)
     from apache_beam.runners.dataflow.internal import apiclient
     if apiclient._use_unified_worker(options):
       if not debug_options.lookup_experiment('beam_fn_api'):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 9c7546d..e7ce71b 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -256,7 +256,6 @@
   def test_streaming_create_translation(self):
     remote_runner = DataflowRunner()
     self.default_properties.append("--streaming")
-    self.default_properties.append("--experiments=disable_runner_v2")
     with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p:
       p | ptransform.Create([1])  # pylint: disable=expression-not-assigned
     job_dict = json.loads(str(remote_runner.job))
@@ -349,7 +348,8 @@
     pcoll2.element_type = typehints.Any
     pcoll3.element_type = typehints.KV[typehints.Any, typehints.Any]
     for pcoll in [pcoll1, pcoll2, pcoll3]:
-      applied = AppliedPTransform(None, beam.GroupByKey(), "label", [pcoll])
+      applied = AppliedPTransform(
+          None, beam.GroupByKey(), "label", {'pcoll': pcoll})
       applied.outputs[None] = PCollection(None)
       common.group_by_key_input_visitor().visit_transform(applied)
       self.assertEqual(
@@ -368,7 +368,7 @@
     for pcoll in [pcoll1, pcoll2]:
       with self.assertRaisesRegex(ValueError, err_msg):
         common.group_by_key_input_visitor().visit_transform(
-            AppliedPTransform(None, beam.GroupByKey(), "label", [pcoll]))
+            AppliedPTransform(None, beam.GroupByKey(), "label", {'in': pcoll}))
 
   def test_group_by_key_input_visitor_for_non_gbk_transforms(self):
     p = TestPipeline()
@@ -376,7 +376,7 @@
     for transform in [beam.Flatten(), beam.Map(lambda x: x)]:
       pcoll.element_type = typehints.Any
       common.group_by_key_input_visitor().visit_transform(
-          AppliedPTransform(None, transform, "label", [pcoll]))
+          AppliedPTransform(None, transform, "label", {'in': pcoll}))
       self.assertEqual(pcoll.element_type, typehints.Any)
 
   def test_flatten_input_with_visitor_with_single_input(self):
@@ -388,11 +388,11 @@
 
   def _test_flatten_input_visitor(self, input_type, output_type, num_inputs):
     p = TestPipeline()
-    inputs = []
-    for _ in range(num_inputs):
+    inputs = {}
+    for ix in range(num_inputs):
       input_pcoll = PCollection(p)
       input_pcoll.element_type = input_type
-      inputs.append(input_pcoll)
+      inputs[str(ix)] = input_pcoll
     output_pcoll = PCollection(p)
     output_pcoll.element_type = output_type
 
@@ -400,7 +400,7 @@
     flatten.add_output(output_pcoll, None)
     DataflowRunner.flatten_input_visitor().visit_transform(flatten)
     for _ in range(num_inputs):
-      self.assertEqual(inputs[0].element_type, output_type)
+      self.assertEqual(inputs['0'].element_type, output_type)
 
   def test_gbk_then_flatten_input_visitor(self):
     p = TestPipeline(
@@ -443,7 +443,7 @@
         z: (x, y, z),
         beam.pvalue.AsSingleton(pc),
         beam.pvalue.AsMultiMap(pc))
-    applied_transform = AppliedPTransform(None, transform, "label", [pc])
+    applied_transform = AppliedPTransform(None, transform, "label", {'pc': pc})
     DataflowRunner.side_input_visitor(
         use_fn_api=True).visit_transform(applied_transform)
     self.assertEqual(2, len(applied_transform.side_inputs))
@@ -839,8 +839,7 @@
         'Runner determined sharding not available in Dataflow for '
         'GroupIntoBatches for jobs not using Runner V2'):
       _ = self._run_group_into_batches_and_get_step_properties(
-          True,
-          ['--enable_streaming_engine', '--experiments=disable_runner_v2'])
+          True, ['--enable_streaming_engine'])
 
     # JRH
     with self.assertRaisesRegex(
@@ -848,12 +847,7 @@
         'Runner determined sharding not available in Dataflow for '
         'GroupIntoBatches for jobs not using Runner V2'):
       _ = self._run_group_into_batches_and_get_step_properties(
-          True,
-          [
-              '--enable_streaming_engine',
-              '--experiments=beam_fn_api',
-              '--experiments=disable_runner_v2'
-          ])
+          True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
 
   def test_pack_combiners(self):
     class PackableCombines(beam.PTransform):
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index 456636c..448fca7 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -685,8 +685,8 @@
 
       def visit_transform(self, transform_node):
         if transform_node.inputs:
-          input_list = list(transform_node.inputs)
-          for i, input_pcoll in enumerate(input_list):
+          main_inputs = dict(transform_node.main_inputs)
+          for tag, input_pcoll in main_inputs.items():
             key = self._pin.cache_key(input_pcoll)
 
             # Replace the input pcollection with the cached pcollection (if it
@@ -694,9 +694,9 @@
             if key in self._pin._cached_pcoll_read:
               # Ignore this pcoll in the final pruned instrumented pipeline.
               self._pin._ignored_targets.add(input_pcoll)
-              input_list[i] = self._pin._cached_pcoll_read[key]
+              main_inputs[tag] = self._pin._cached_pcoll_read[key]
           # Update the transform with its new inputs.
-          transform_node.inputs = tuple(input_list)
+          transform_node.main_inputs = main_inputs
 
     v = ReadCacheWireVisitor(self)
     pipeline.visit(v)
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index 40d19a7..a3f91c0 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -297,11 +297,11 @@
 
       def visit_transform(self, transform_node):
         if transform_node.inputs:
-          input_list = list(transform_node.inputs)
-          for i in range(len(input_list)):
-            if input_list[i] == init_pcoll:
-              input_list[i] = cached_init_pcoll
-          transform_node.inputs = tuple(input_list)
+          main_inputs = dict(transform_node.main_inputs)
+          for tag in main_inputs.keys():
+            if main_inputs[tag] == init_pcoll:
+              main_inputs[tag] = cached_init_pcoll
+          transform_node.main_inputs = main_inputs
 
     v = TestReadCacheWireVisitor()
     p_origin.visit(v)
diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py
index b3b4a96..d655a90 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -207,15 +207,56 @@
     return 'ProcessingTimeEvent: <{}>'.format(self.advance_by)
 
 
-class WindowedValueHolder:
+class WindowedValueHolderMeta(type):
+  """A metaclass that overrides the isinstance check for WindowedValueHolder.
+
+  Python does a quick test for exact match. If an instance is exactly of
+  type WindowedValueHolder, the overridden isinstance check is omitted.
+  The override is needed because WindowedValueHolder elements encoded then
+  decoded become Row elements.
+  """
+  def __instancecheck__(cls, other):
+    """Checks if a beam.Row typed instance is a WindowedValueHolder.
+    """
+    return (
+        isinstance(other, beam.Row) and hasattr(other, 'windowed_value') and
+        hasattr(other, 'urn') and
+        isinstance(other.windowed_value, WindowedValue) and
+        other.urn == common_urns.coders.ROW.urn)
+
+
+class WindowedValueHolder(beam.Row, metaclass=WindowedValueHolderMeta):
   """A class that holds a WindowedValue.
 
   This is a special class that can be used by the runner that implements the
   TestStream as a signal that the underlying value should be unreified to the
   specified window.
   """
+  # Register WindowedValueHolder to always use RowCoder.
+  coders.registry.register_coder(WindowedValueHolderMeta, coders.RowCoder)
+
   def __init__(self, windowed_value):
-    self.windowed_value = windowed_value
+    assert isinstance(windowed_value, WindowedValue), (
+        'WindowedValueHolder can only hold %s type. Instead, %s is given.') % (
+            WindowedValue, windowed_value)
+    super().__init__(
+        **{
+            'windowed_value': windowed_value, 'urn': common_urns.coders.ROW.urn
+        })
+
+  @classmethod
+  def from_row(cls, row):
+    """Converts a beam.Row typed instance to WindowedValueHolder.
+    """
+    if isinstance(row, WindowedValueHolder):
+      return WindowedValueHolder(row.windowed_value)
+    assert isinstance(row, beam.Row), 'The given row %s must be a %s type' % (
+        row, beam.Row)
+    assert hasattr(row, 'windowed_value'), (
+        'The given %s must have a windowed_value attribute.') % row
+    assert isinstance(row.windowed_value, WindowedValue), (
+        'The windowed_value attribute of %s must be a %s type') % (
+            row, WindowedValue)
 
 
 class TestStream(PTransform):
diff --git a/sdks/python/apache_beam/testing/test_stream_it_test.py b/sdks/python/apache_beam/testing/test_stream_it_test.py
index 65a6aeb..0e293ed 100644
--- a/sdks/python/apache_beam/testing/test_stream_it_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_it_test.py
@@ -22,7 +22,7 @@
 import unittest
 from functools import wraps
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam.options.pipeline_options import StandardOptions
@@ -67,7 +67,7 @@
     cls.project = cls.test_pipeline.get_option('project')
 
   @supported(['DirectRunner', 'SwitchingDirectRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_basic_execution(self):
     test_stream = (
         TestStream().advance_watermark_to(10).add_elements([
@@ -103,7 +103,7 @@
           ]))
 
   @supported(['DirectRunner', 'SwitchingDirectRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_multiple_outputs(self):
     """Tests that the TestStream supports emitting to multiple PCollections."""
     letters_elements = [
@@ -151,7 +151,7 @@
     p.run()
 
   @supported(['DirectRunner', 'SwitchingDirectRunner'])
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_multiple_outputs_with_watermark_advancement(self):
     """Tests that the TestStream can independently control output watermarks."""
 
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py
index 94445dd..a4580b7 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -332,6 +332,27 @@
               ('a', timestamp.Timestamp(5), beam.window.IntervalWindow(5, 10)),
           ]))
 
+  def test_instance_check_windowed_value_holder(self):
+    windowed_value = WindowedValue(
+        'a',
+        Timestamp(5), [beam.window.IntervalWindow(5, 10)],
+        PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0))
+    self.assertTrue(
+        isinstance(WindowedValueHolder(windowed_value), WindowedValueHolder))
+    self.assertTrue(
+        isinstance(
+            beam.Row(
+                windowed_value=windowed_value, urn=common_urns.coders.ROW.urn),
+            WindowedValueHolder))
+    self.assertFalse(
+        isinstance(
+            beam.Row(windowed_value=windowed_value), WindowedValueHolder))
+    self.assertFalse(isinstance(windowed_value, WindowedValueHolder))
+    self.assertFalse(
+        isinstance(beam.Row(x=windowed_value), WindowedValueHolder))
+    self.assertFalse(
+        isinstance(beam.Row(windowed_value=1), WindowedValueHolder))
+
   def test_gbk_execution_no_triggers(self):
     test_stream = (
         TestStream().advance_watermark_to(10).add_elements([
diff --git a/sdks/python/apache_beam/transforms/external_it_test.py b/sdks/python/apache_beam/transforms/external_it_test.py
index b1eda0a..e24b70f 100644
--- a/sdks/python/apache_beam/transforms/external_it_test.py
+++ b/sdks/python/apache_beam/transforms/external_it_test.py
@@ -21,7 +21,7 @@
 
 import unittest
 
-from nose.plugins.attrib import attr
+import pytest
 
 import apache_beam as beam
 from apache_beam import Pipeline
@@ -33,7 +33,7 @@
 
 
 class ExternalTransformIT(unittest.TestCase):
-  @attr('IT')
+  @pytest.mark.it_postcommit
   def test_job_python_from_python_it(self):
     @ptransform.PTransform.register_urn('simple', None)
     class SimpleTransform(ptransform.PTransform):
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 788485cd..50f7715 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -50,6 +50,7 @@
 from typing import Callable
 from typing import Dict
 from typing import List
+from typing import Mapping
 from typing import Optional
 from typing import Sequence
 from typing import Tuple
@@ -253,7 +254,7 @@
       return self.visit_nested(node)
 
 
-def get_named_nested_pvalues(pvalueish):
+def get_named_nested_pvalues(pvalueish, as_inputs=False):
   if isinstance(pvalueish, tuple):
     # Check to see if it's a named tuple.
     fields = getattr(pvalueish, '_fields', None)
@@ -262,16 +263,22 @@
     else:
       tagged_values = enumerate(pvalueish)
   elif isinstance(pvalueish, list):
+    if as_inputs:
+      # Full list treated as a list of value for eager evaluation.
+      yield None, pvalueish
+      return
     tagged_values = enumerate(pvalueish)
   elif isinstance(pvalueish, dict):
     tagged_values = pvalueish.items()
   else:
-    if isinstance(pvalueish, (pvalue.PValue, pvalue.DoOutputsTuple)):
+    if as_inputs or isinstance(pvalueish,
+                               (pvalue.PValue, pvalue.DoOutputsTuple)):
       yield None, pvalueish
     return
 
   for tag, subvalue in tagged_values:
-    for subtag, subsubvalue in get_named_nested_pvalues(subvalue):
+    for subtag, subsubvalue in get_named_nested_pvalues(
+        subvalue, as_inputs=as_inputs):
       if subtag is None:
         yield tag, subsubvalue
       else:
@@ -569,6 +576,8 @@
   def __ror__(self, left, label=None):
     """Used to apply this PTransform to non-PValues, e.g., a tuple."""
     pvalueish, pvalues = self._extract_input_pvalues(left)
+    if isinstance(pvalues, dict):
+      pvalues = tuple(pvalues.values())
     pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)]
     if pvalues and not pipelines:
       deferred = False
@@ -597,8 +606,7 @@
     # pylint: enable=wrong-import-order, wrong-import-position
     replacements = {
         id(v): p | 'CreatePInput%s' % ix >> Create(v, reshuffle=False)
-        for ix,
-        v in enumerate(pvalues)
+        for (ix, v) in enumerate(pvalues)
         if not isinstance(v, pvalue.PValue) and v is not None
     }
     pvalueish = _SetInputPValues().visit(pvalueish, replacements)
@@ -628,19 +636,11 @@
     if isinstance(pvalueish, pipeline.Pipeline):
       pvalueish = pvalue.PBegin(pvalueish)
 
-    def _dict_tuple_leaves(pvalueish):
-      if isinstance(pvalueish, tuple):
-        for a in pvalueish:
-          for p in _dict_tuple_leaves(a):
-            yield p
-      elif isinstance(pvalueish, dict):
-        for a in pvalueish.values():
-          for p in _dict_tuple_leaves(a):
-            yield p
-      else:
-        yield pvalueish
-
-    return pvalueish, tuple(_dict_tuple_leaves(pvalueish))
+    return pvalueish, {
+        str(tag): value
+        for (tag, value) in get_named_nested_pvalues(
+            pvalueish, as_inputs=True)
+    }
 
   def _pvaluish_from_dict(self, input_dict):
     if len(input_dict) == 1:
@@ -648,16 +648,15 @@
     else:
       return input_dict
 
-  def _named_inputs(self, inputs, side_inputs):
-    # type: (Sequence[pvalue.PValue], Sequence[Any]) -> Dict[str, pvalue.PValue]
+  def _named_inputs(self, main_inputs, side_inputs):
+    # type: (Mapping[str, pvalue.PValue], Sequence[Any]) -> Dict[str, pvalue.PValue]
 
     """Returns the dictionary of named inputs (including side inputs) as they
     should be named in the beam proto.
     """
-    # TODO(BEAM-1833): Push names up into the sdk construction.
     main_inputs = {
-        str(ix): input
-        for (ix, input) in enumerate(inputs)
+        tag: input
+        for (tag, input) in main_inputs.items()
         if isinstance(input, pvalue.PCollection)
     }
     named_side_inputs = {(SIDE_INPUT_PREFIX + '%s') % ix: si.pvalue
diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile
index 34a35b4..359ca2c 100644
--- a/sdks/python/container/Dockerfile
+++ b/sdks/python/container/Dockerfile
@@ -101,6 +101,7 @@
 
 
 COPY target/LICENSE /opt/apache/beam/
+COPY target/LICENSE.python /opt/apache/beam/
 COPY target/NOTICE /opt/apache/beam/
 
 ADD target/launcher/linux_amd64/boot /opt/apache/beam/
diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt
index 2926ff1..c627b4e 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -52,6 +52,8 @@
 google-cloud-datastore==1.15.3
 google-cloud-dlp==0.13.0
 google-cloud-language==1.3.0
+google-cloud-profiler==3.0.4
+google-cloud-recommendations-ai==0.2.0
 google-cloud-spanner==1.13.0
 google-cloud-videointelligence==1.13.0
 google-cloud-vision==0.42.0
@@ -66,6 +68,7 @@
 dataclasses == 0.8 ; python_version=="3.6"
 guppy3==3.0.10
 mmh3==2.5.1
+orjson==3.5.3
 python-dateutil == 2.8.1
 requests == 2.24.0
 freezegun == 0.3.15
diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini
index 2906300..a162b69 100644
--- a/sdks/python/pytest.ini
+++ b/sdks/python/pytest.ini
@@ -29,6 +29,7 @@
 markers =
     xlang_transforms: collect Cross Language transforms test runs
     xlang_sql_expansion_service: collect for Cross Language with SQL expansion service test runs
+    it_postcommit: collect for post-commit integration test runs
     it_validatesrunner: collect for ValidatesRunner integration test runs
     no_sickbay_streaming: run without sickbay-streaming
     no_sickbay_batch: run without sickbay-batch
diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle
index 3daf379..7d1cf2b 100644
--- a/sdks/python/test-suites/dataflow/common.gradle
+++ b/sdks/python/test-suites/dataflow/common.gradle
@@ -28,14 +28,7 @@
 
 def runScriptsDir = "${rootDir}/sdks/python/scripts"
 
-// TODO(BEAM-3713) Remove once nose is removed
 // Basic test options for ITs running on Jenkins.
-def basicTestOpts = [
-    "--nocapture",  // print stdout instantly
-    "--processes=8",  // run tests in parallel
-    "--process-timeout=4500", // timeout of whole command execution
-]
-
 def basicPytestOpts = [
     "--capture=no",  // print stdout instantly
     "--timeout=4500", // timeout of whole command execution
@@ -56,22 +49,23 @@
     doLast {
       // Basic integration tests to run in PreCommit
       def precommitTests = streaming ? [
-              "apache_beam.examples.streaming_wordcount_it_test:StreamingWordCountIT.test_streaming_wordcount_it",
+              "apache_beam/examples/streaming_wordcount_it_test.py::StreamingWordCountIT::test_streaming_wordcount_it",
       ] : [
-              "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
+              "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
       ]
       def testOpts = [
-              "--tests=${precommitTests.join(',')}",
-              "--nocapture",    // Print stdout instantly
-              "--processes=2",    // Number of tests running in parallel
-              "--process-timeout=1800",   // Timeout of whole command execution
+              "${precommitTests.join(' ')}",
+              "--capture=no",    // Print stdout instantly
+              "--numprocesses=2",    // Number of tests running in parallel
+              "--timeout=1800",   // Timeout of whole command execution
       ]
 
       def argMap = [
               "test_opts"   : testOpts,
               "sdk_location": files(configurations.distTarBall.files).singleFile,
               "worker_jar"  : dataflowWorkerJar,
-              "suite"       : "preCommitIT-df${pythonSuffix}"
+              "suite"       : "preCommitIT-df${pythonSuffix}",
+              "pytest"      : true, // TODO(BEAM-3713): Remove this once nose is removed.
       ]
 
       if (runnerV2) {
@@ -115,14 +109,16 @@
   def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
 
   doLast {
-    def testOpts = basicTestOpts + ["--attr=IT"]
-
-    def cmdArgs = mapToArgString([
+    def testOpts = basicPytestOpts + ["--numprocesses=8", "--dist=loadfile"]
+    def argMap = [
         "test_opts": testOpts,
         "sdk_location": files(configurations.distTarBall.files).singleFile,
         "worker_jar": dataflowWorkerJar,
-        "suite": "postCommitIT-df${pythonVersionSuffix}"
-    ])
+        "suite": "postCommitIT-df${pythonVersionSuffix}",
+        "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+        "collect": "it_postcommit"
+    ]
+    def cmdArgs = mapToArgString(argMap)
     exec {
       executable 'sh'
       args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle
index 42bba4c..50fe59a 100644
--- a/sdks/python/test-suites/direct/common.gradle
+++ b/sdks/python/test-suites/direct/common.gradle
@@ -21,9 +21,11 @@
 def runScriptsDir = "${rootDir}/sdks/python/scripts"
 // Basic test options for ITs running on Jenkins.
 def basicTestOpts = [
-    "--nocapture",  // print stdout instantly
-    "--processes=8",  // run tests in parallel
-    "--process-timeout=4500", // timeout of whole command execution
+    "--capture=no",  // print stdout instantly
+    "--numprocesses=8",  // run tests in parallel
+    "--timeout=4500", // timeout of whole command execution
+    "--color=yes", // console color
+    "--log-cli-level=INFO" //log level info
 ]
 
 task postCommitIT {
@@ -32,25 +34,22 @@
   // Run IT tests with TestDirectRunner in batch in Python 3.
   doLast {
     def batchTests = [
-        "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
-        "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
-        "apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT",
-        "apache_beam.io.gcp.bigquery_io_read_it_test",
-        "apache_beam.io.gcp.bigquery_read_it_test",
-        "apache_beam.io.gcp.bigquery_write_it_test",
-        "apache_beam.io.gcp.datastore.v1new.datastore_write_it_test",
-        "apache_beam.io.gcp.experimental.spannerio_read_it_test",
-        "apache_beam.io.gcp.experimental.spannerio_write_it_test",
+        "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
+        "apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest",
+        "apache_beam/io/gcp/big_query_query_to_table_it_test.py::BigQueryQueryToTableIT",
+        "apache_beam/io/gcp/bigquery_io_read_it_test.py",
+        "apache_beam/io/gcp/bigquery_read_it_test.py",
+        "apache_beam/io/gcp/bigquery_write_it_test.py",
+        "apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py",
+        "apache_beam/io/gcp/experimental/spannerio_read_it_test.py",
+        "apache_beam/io/gcp/experimental/spannerio_write_it_test.py",
     ]
-    def testOpts = [
-        "--tests=${batchTests.join(',')}",
-        "--nocapture",    // Print stdout instantly
-        "--processes=8",  // run tests in parallel
-        "--process-timeout=4500", // timeout of whole command execution
-    ]
+    def testOpts = basicTestOpts + ["${batchTests.join(' ')}"]
     def argMap = ["runner": "TestDirectRunner",
                   "test_opts": testOpts,
-                  "suite": "postCommitIT-direct-py${pythonVersionSuffix}"]
+                  "suite": "postCommitIT-direct-py${pythonVersionSuffix}",
+                  "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+                  ]
     def batchCmdArgs = mapToArgString(argMap)
     exec {
       executable 'sh'
@@ -97,18 +96,20 @@
   // Run IT tests with TestDirectRunner in batch.
   doLast {
     def tests = [
-        "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
-        "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
-        "apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT",
-        "apache_beam.io.gcp.bigquery_io_read_it_test",
-        "apache_beam.io.gcp.bigquery_read_it_test",
-        "apache_beam.io.gcp.bigquery_write_it_test",
-        "apache_beam.io.gcp.datastore.v1new.datastore_write_it_test",
+        "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
+        "apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest",
+        "apache_beam/io/gcp/big_query_query_to_table_it_test.py::BigQueryQueryToTableIT",
+        "apache_beam/io/gcp/bigquery_io_read_it_test.py",
+        "apache_beam/io/gcp/bigquery_read_it_test.py",
+        "apache_beam/io/gcp/bigquery_write_it_test.py",
+        "apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py",
     ]
-    def batchTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
+    def batchTestOpts = basicTestOpts + ["${tests.join(' ')}"]
     def argMap = ["runner": "TestDirectRunner",
                   "test_opts": batchTestOpts,
-                  "suite": "directRunnerIT-batch"]
+                  "suite": "directRunnerIT-batch",
+                  "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+                  ]
     def batchCmdArgs = mapToArgString(argMap)
     exec {
       executable 'sh'
@@ -119,18 +120,19 @@
   // Run IT tests with TestDirectRunner in streaming.
   doLast {
     def tests = [
-        "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
-        "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
-        "apache_beam.io.gcp.bigquery_test:BigQueryStreamingInsertTransformIntegrationTests\
-.test_multiple_destinations_transform",
-        "apache_beam.io.gcp.bigquery_test:PubSubBigQueryIT",
-        "apache_beam.io.gcp.bigquery_file_loads_test:BigQueryFileLoadsIT.test_bqfl_streaming",
+        "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
+        "apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest",
+        "apache_beam/io/gcp/bigquery_test.py::BigQueryStreamingInsertTransformIntegrationTests::test_multiple_destinations_transform",
+        "apache_beam/io/gcp/bigquery_test.py::PubSubBigQueryIT",
+        "apache_beam/io/gcp/bigquery_file_loads_test.py::BigQueryFileLoadsIT::test_bqfl_streaming",
     ]
-    def streamingTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
+    def streamingTestOpts = basicTestOpts + ["${tests.join(' ')}"]
     def argMap = ["runner": "TestDirectRunner",
                   "streaming": "true",
                   "test_opts": streamingTestOpts,
-                  "suite": "directRunnerIT-streaming"]
+                  "suite": "directRunnerIT-streaming",
+                  "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+                ]
     def streamingCmdArgs = mapToArgString(argMap)
     exec {
       executable 'sh'
diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle
index ecf0822..20665f0 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -201,14 +201,14 @@
 
   doLast {
     def tests = [
-            "apache_beam.io.gcp.bigquery_read_it_test",
-            "apache_beam.io.external.xlang_jdbcio_it_test",
-            "apache_beam.io.external.xlang_kafkaio_it_test",
-            "apache_beam.io.external.xlang_kinesisio_it_test",
-            "apache_beam.io.gcp.tests.xlang_spannerio_it_test",
-            "apache_beam.io.external.xlang_debeziumio_it_test",
+            "apache_beam/io/gcp/bigquery_read_it_test.py",
+            "apache_beam/io/external/xlang_jdbcio_it_test.py",
+            "apache_beam/io/external/xlang_kafkaio_it_test.py",
+            "apache_beam/io/external/xlang_kinesisio_it_test.py",
+            "apache_beam/io/gcp/tests/xlang_spannerio_it_test.py",
+            "apache_beam/io/external/xlang_debeziumio_it_test.py",
     ]
-    def testOpts = ["--tests=${tests.join(',')}"]
+    def testOpts = ["${tests.join(' ')}"] + ["--log-cli-level=INFO"]
     def pipelineOpts = [
         "--runner=FlinkRunner",
         "--project=apache-beam-testing",
@@ -220,6 +220,7 @@
             "test_opts": testOpts,
             "suite": "postCommitIT-flink-py${pythonVersionSuffix}",
             "pipeline_opts": pipelineOpts.join(" "),
+            "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
     ])
     def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
     exec {
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index 7edcd7a..c4a3c95 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -2434,6 +2434,10 @@
 lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')
 {{< /highlight >}}
 
+{{< highlight go >}}
+lines :=  textio.Read(scope, 'gs://some/inputData.txt')
+{{< /highlight >}}
+
 ### 5.2. Writing output data {#pipeline-io-writing-data}
 
 Write transforms write the data in a `PCollection` to an external data source.
@@ -2449,6 +2453,10 @@
 output | beam.io.WriteToText('gs://some/outputData')
 {{< /highlight >}}
 
+{{< highlight go >}}
+textio.Write(scope, 'gs://some/inputData.txt', output)
+{{< /highlight >}}
+
 ### 5.3. File-based input and output data {#file-based-data}
 
 #### 5.3.1. Reading from multiple locations {#file-based-reading-multiple-locations}
@@ -2468,6 +2476,10 @@
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_pipelineio_read >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+lines := textio.Read(scope, "path/to/input-*.csv")
+{{< /highlight >}}
+
 To read data from disparate sources into a single `PCollection`, read each one
 independently and then use the [Flatten](#flatten) transform to create a single
 `PCollection`.
@@ -2493,6 +2505,12 @@
 {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_pipelineio_write >}}
 {{< /highlight >}}
 
+{{< highlight go >}}
+// The Go SDK textio doesn't support sharding on writes yet.
+// See https://issues.apache.org/jira/browse/BEAM-12664 for ways
+// to contribute a solution.
+{{< /highlight >}}
+
 ### 5.4. Beam-provided I/O transforms {#provided-io-transforms}
 
 See the [Beam-provided I/O Transforms](/documentation/io/built-in/)
diff --git a/website/www/site/layouts/partials/head.html b/website/www/site/layouts/partials/head.html
index 8030193..1b1aa7c 100644
--- a/website/www/site/layouts/partials/head.html
+++ b/website/www/site/layouts/partials/head.html
@@ -36,7 +36,8 @@
 </style>
 
 <script src="/js/bootstrap.min.js"></script>
-<script src="/js/language-switch.js"></script>
+<!-- TODO [BEAM-12644]: Create an asset pipeline so we don't have to version .js files. -->
+<script src="/js/language-switch-v2.js"></script>
 <script src="/js/fix-menu.js"></script>
 <script src="/js/section-nav.js"></script>
 <script src="/js/page-nav.js"></script>
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index 94aa1d1..4491ae1 100644
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -12,338 +12,202 @@
 
 <li><span class="section-nav-list-main-title">Documentation</span></li>
 <li><a href="/documentation">Using the Documentation</a></li>
-<li>
-  <span class="section-nav-list-title">How-to guides</span>
-
-  <ul class="section-nav-list">
-    <li class="section-nav-item--collapsible">
-      <span class="section-nav-list-title">Transform catalog</span>
-
-      <ul class="section-nav-list">
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">Python</span>
-
-          <ul class="section-nav-list">
-            <li><a href="/documentation/transforms/python/overview/">Overview</a></li>
-            <li class="section-nav-item--collapsible">
-              <span class="section-nav-list-title">Element-wise</span>
-
-              <ul class="section-nav-list">
-                <li><a href="/documentation/transforms/python/elementwise/filter/">Filter</a></li>
-                <li><a href="/documentation/transforms/python/elementwise/flatmap/">FlatMap</a></li>
-                <li><a href="/documentation/transforms/python/elementwise/keys/">Keys</a></li>
-                <li><a href="/documentation/transforms/python/elementwise/kvswap/">KvSwap</a></li>
-                <li><a href="/documentation/transforms/python/elementwise/map/">Map</a></li>
-                <li><a href="/documentation/transforms/python/elementwise/pardo/">ParDo</a></li>
-                <li><a href="/documentation/transforms/python/elementwise/partition/">Partition</a></li>
-                <li><a href="/documentation/transforms/python/elementwise/regex/">Regex</a></li>
-                <li><a href="/documentation/transforms/python/elementwise/reify/">Reify</a></li>
-                <li><a href="/documentation/transforms/python/elementwise/tostring/">ToString</a></li>
-                <li><a href="/documentation/transforms/python/elementwise/values/">Values</a></li>
-                <li><a href="/documentation/transforms/python/elementwise/withtimestamps/">WithTimestamps</a></li>
-              </ul>
-            </li>
-            <li class="section-nav-item--collapsible">
-              <span class="section-nav-list-title">Aggregation</span>
-
-              <ul class="section-nav-list">
-                <li><a href="/documentation/transforms/python/aggregation/cogroupbykey/">CoGroupByKey</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/combineglobally/">CombineGlobally</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/combineperkey/">CombinePerKey</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/combinevalues/">CombineValues</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/count/">Count</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/distinct/">Distinct</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/groupbykey/">GroupByKey</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/groupby/">GroupBy</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/groupintobatches/">GroupIntoBatches</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/latest/">Latest</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/max/">Max</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/min/">Min</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/mean/">Mean</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/sample/">Sample</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/sum/">Sum</a></li>
-                <li><a href="/documentation/transforms/python/aggregation/top/">Top</a></li>
-              </ul>
-            </li>
-            <li class="section-nav-item--collapsible">
-              <span class="section-nav-list-title">Other</span>
-
-              <ul class="section-nav-list">
-                <li><a href="/documentation/transforms/python/other/create/">Create</a></li>
-                <li><a href="/documentation/transforms/python/other/flatten/">Flatten</a></li>
-                <li><a href="/documentation/transforms/python/other/reshuffle/">Reshuffle</a></li>
-                <li><a href="/documentation/transforms/python/other/windowinto/">WindowInto</a></li>
-              </ul>
-            </li>
-          </ul>
-        </li>
-
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">Java</span>
-
-          <ul class="section-nav-list">
-            <li><a href="/documentation/transforms/java/overview/">Overview</a></li>
-            <li class="section-nav-item--collapsible">
-              <span class="section-nav-list-title">Element-wise</span>
-
-              <ul class="section-nav-list">
-                <li><a href="/documentation/transforms/java/elementwise/filter/">Filter</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/flatmapelements/">FlatMapElements</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/keys/">Keys</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/kvswap/">KvSwap</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/mapelements/">MapElements</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/pardo/">ParDo</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/partition/">Partition</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/regex/">Regex</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/reify/">Reify</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/values/">Values</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/tostring/">ToString</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/withkeys/">WithKeys</a></li>
-                <li><a href="/documentation/transforms/java/elementwise/withtimestamps/">WithTimestamps</a></li>
-              </ul>
-            </li>
-            <li class="section-nav-item--collapsible">
-              <span class="section-nav-list-title">Aggregation</span>
-
-              <ul class="section-nav-list">
-                <li><a href="/documentation/transforms/java/aggregation/approximatequantiles/">ApproximateQuantiles</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/approximateunique/">ApproximateUnique</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/cogroupbykey/">CoGroupByKey</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/combine/">Combine</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/combinewithcontext/">CombineWithContext</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/count/">Count</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/distinct/">Distinct</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/groupbykey/">GroupByKey</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/groupintobatches/">GroupIntoBatches</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/hllcount/">HllCount</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/latest/">Latest</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/max/">Max</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/mean/">Mean</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/min/">Min</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/sample/">Sample</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/sum/">Sum</a></li>
-                <li><a href="/documentation/transforms/java/aggregation/top/">Top</a></li>
-              </ul>
-            </li>
-            <li class="section-nav-item--collapsible">
-              <span class="section-nav-list-title">Other</span>
-
-              <ul class="section-nav-list">
-                <li><a href="/documentation/transforms/java/other/create/">Create</a></li>
-                <li><a href="/documentation/transforms/java/other/flatten/">Flatten</a></li>
-                <li><a href="/documentation/transforms/java/other/passert/">PAssert</a></li>
-                <li><a href="/documentation/transforms/java/other/view/">View</a></li>
-                <li><a href="/documentation/transforms/java/other/window/">Window</a></li>
-              </ul>
-            </li>
-          </ul>
-        </li>
-      </ul>
-    </li>
-    <li class="section-nav-item--collapsible">
-      <span class="section-nav-list-title">Common pipeline patterns</span>
-
-      <ul class="section-nav-list">
-        <li><a href="/documentation/patterns/overview/">Overview</a></li>
-        <li><a href="/documentation/patterns/file-processing/">File processing</a></li>
-        <li><a href="/documentation/patterns/side-inputs/">Side inputs</a></li>
-        <li><a href="/documentation/patterns/pipeline-options/">Pipeline options</a></li>
-        <li><a href="/documentation/patterns/custom-io/">Custom I/O</a></li>
-        <li><a href="/documentation/patterns/custom-windows/">Custom windows</a></li>
-        <li><a href="/documentation/patterns/bigqueryio/">BigQueryIO</a></li>
-        <li><a href="/documentation/patterns/ai-platform/">AI Platform</a></li>
-        <li><a href="/documentation/patterns/schema/">Schema</a></li>
-        <li><a href="/documentation/patterns/bqml/">BigQuery ML</a></li>
-        <li><a href="/documentation/patterns/cross-language/">Cross-language transforms</a></li>
-      </ul>
-    </li>
-  </ul>
-</li>
-
-<li>
+<li class="section-nav-item--collapsible">
   <span class="section-nav-list-title">Concepts</span>
 
   <ul class="section-nav-list">
     <li><a href="/documentation/basics/">Basics of the Beam model</a></li>
     <li><a href="/documentation/runtime/model/">How Beam executes a pipeline</a></li>
-    <li>
-      <span class="section-nav-list-title">Pipeline development lifecycle</span>
+  </ul>
+</li>
+<li class="section-nav-item--collapsible">
+  <span class="section-nav-list-title">Beam programming guide</span>
+
+  <ul class="section-nav-list">
+    <li><a href="/documentation/programming-guide/">Overview</a></li>
+    <li><a href="/documentation/programming-guide/#creating-a-pipeline">Pipelines</a></li>
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">PCollections</span>
 
       <ul class="section-nav-list">
-        <li><a href="/documentation/pipelines/design-your-pipeline/">Design Your Pipeline</a></li>
-        <li><a href="/documentation/pipelines/create-your-pipeline/">Create Your Pipeline</a></li>
-        <li><a href="/documentation/pipelines/test-your-pipeline/">Test Your Pipeline</a></li>
+        <li><a href="/documentation/programming-guide/#pcollections">Creating a PCollection</a></li>
+        <li><a href="/documentation/programming-guide/#pcollection-characteristics">PCollection characteristics</a></li>
+      </ul>
+    </li>
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">Transforms</span>
+
+      <ul class="section-nav-list">
+        <li><a href="/documentation/programming-guide/#applying-transforms">Applying transforms</a></li>
+        <li>
+          <span class="section-nav-list-title">Core Beam transforms</span>
+
+          <ul class="section-nav-list">
+            <li><a href="/documentation/programming-guide/#pardo">ParDo</a></li>
+            <li><a href="/documentation/programming-guide/#groupbykey">GroupByKey</a></li>
+            <li><a href="/documentation/programming-guide/#cogroupbykey">CoGroupByKey</a></li>
+            <li><a href="/documentation/programming-guide/#combine">Combine</a></li>
+            <li><a href="/documentation/programming-guide/#flatten">Flatten</a></li>
+            <li><a href="/documentation/programming-guide/#partition">Partition</a></li>
+          </ul>
+        </li>
+
+        <li><a href="/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms">Requirements for user code</a></li>
+        <li><a href="/documentation/programming-guide/#side-inputs">Side inputs</a></li>
+        <li><a href="/documentation/programming-guide/#additional-outputs">Additional outputs</a></li>
+        <li><a href="/documentation/programming-guide/#composite-transforms">Composite transforms</a></li>
+      </ul>
+    </li>
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">Pipeline I/O</span>
+
+      <ul class="section-nav-list">
+        <li><a href="/documentation/programming-guide/#pipeline-io">Using I/O transforms</a></li>
+        <li><a href="/documentation/io/built-in/">Built-in I/O connectors</a></li>
+
+        <li class="section-nav-item--collapsible">
+          <span class="section-nav-list-title">Built-in I/O connector guides</span>
+          <ul class="section-nav-list">
+            <li><a href="/documentation/io/built-in/parquet/">Apache Parquet I/O connector
+            </a></li>
+            <li><a href="/documentation/io/built-in/hadoop/">Hadoop Input/Output Format IO</a></li>
+            <li><a href="/documentation/io/built-in/hcatalog/">HCatalog IO</a></li>
+            <li><a href="/documentation/io/built-in/google-bigquery/">Google BigQuery I/O connector</a></li>
+            <li><a href="/documentation/io/built-in/snowflake/">Snowflake I/O connector</a></li>
+          </ul>
+      </li>
+
+
+        <li class="section-nav-item--collapsible">
+          <span class="section-nav-list-title">Developing new I/O connectors</span>
+
+          <ul class="section-nav-list">
+          <li><a href="/documentation/io/developing-io-overview/">Overview: Developing connectors</a></li>
+          <li><a href="/documentation/io/developing-io-java/">Developing connectors (Java)</a></li>
+          <li><a href="/documentation/io/developing-io-python/">Developing connectors (Python)</a></li>
+          </ul>
+        </li>
+
+        <li><a href="/documentation/io/testing/">Testing I/O transforms</a></li>
+      </ul>
+    </li>
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">Schemas</span>
+
+      <ul class="section-nav-list">
+        <li><a href="/documentation/programming-guide/#what-is-a-schema">What is a schema</a></li>
+        <li><a href="/documentation/programming-guide/#schemas-for-pl-types">Schemas for programming language types</a></li>
+        <li><a href="/documentation/programming-guide/#schema-definition">Schema definition</a></li>
+        <li><a href="/documentation/programming-guide/#logical-types">Logical types</a></li>
+        <li><a href="/documentation/programming-guide/#creating-schemas">Creating schemas</a></li>
+        <li><a href="/documentation/programming-guide/#using-schemas">Using schemas</a></li>
+      </ul>
+    </li>
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">Data encoding and type safety</span>
+
+      <ul class="section-nav-list">
+        <li><a href="/documentation/programming-guide/#data-encoding-and-type-safety">Data encoding basics</a></li>
+        <li><a href="/documentation/programming-guide/#specifying-coders">Specifying coders</a></li>
+        <li><a href="/documentation/programming-guide/#default-coders-and-the-coderregistry">Default coders and the CoderRegistry</a></li>
+      </ul>
+    </li>
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">Windowing</span>
+
+      <ul class="section-nav-list">
+        <li><a href="/documentation/programming-guide/#windowing">Windowing basics</a></li>
+        <li><a href="/documentation/programming-guide/#provided-windowing-functions">Provided windowing functions</a></li>
+        <li><a href="/documentation/programming-guide/#setting-your-pcollections-windowing-function">Setting your PCollection’s windowing function</a></li>
+        <li><a href="/documentation/programming-guide/#watermarks-and-late-data">Watermarks and late data</a></li>
+        <li><a href="/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements">Adding timestamps to a PCollection’s elements</a></li>
+      </ul>
+    </li>
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">Triggers</span>
+
+      <ul class="section-nav-list">
+        <li><a href="/documentation/programming-guide/#triggers">Trigger basics</a></li>
+        <li><a href="/documentation/programming-guide/#event-time-triggers">Event time triggers and the default trigger</a></li>
+        <li><a href="/documentation/programming-guide/#processing-time-triggers">Processing time triggers</a></li>
+        <li><a href="/documentation/programming-guide/#data-driven-triggers">Data-driven triggers</a></li>
+        <li><a href="/documentation/programming-guide/#setting-a-trigger">Setting a trigger</a></li>
+        <li><a href="/documentation/programming-guide/#composite-triggers">Composite triggers</a></li>
+      </ul>
+    </li>
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">Metrics</span>
+
+      <ul class="section-nav-list">
+        <li><a href="/documentation/programming-guide/#metrics">Metrics basics</a></li>
+        <li><a href="/documentation/programming-guide/#types-of-metrics">Types of metrics</a></li>
+        <li><a href="/documentation/programming-guide/#querying-metrics">Querying metrics</a></li>
+        <li><a href="/documentation/programming-guide/#using-metrics">Using metrics in pipeline</a></li>
+        <li><a href="/documentation/programming-guide/#export-metrics">Export metrics</a></li>
+      </ul>
+    </li>
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">State and Timers</span>
+
+      <ul class="section-nav-list">
+        <li><a href="/documentation/programming-guide/#types-of-state">Types of state</a></li>
+        <li><a href="/documentation/programming-guide/#deferred-state-reads">Deferred state reads</a></li>
+        <li><a href="/documentation/programming-guide/#timers">Timers</a></li>
+        <li><a href="/documentation/programming-guide/#garbage-collecting-state">Garbage collecting state</a></li>
+        <li><a href="/documentation/programming-guide/#state-timers-examples">State and timers examples</a></li>
       </ul>
     </li>
 
-    <li>
-      <span class="section-nav-list-title">Beam programming guide</span>
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">Splittable DoFns</span>
 
       <ul class="section-nav-list">
-        <li><a href="/documentation/programming-guide/">Overview</a></li>
-        <li><a href="/documentation/programming-guide/#creating-a-pipeline">Pipelines</a></li>
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">PCollections</span>
+        <li><a href="/documentation/programming-guide/#sdf-basics">Basics</a></li>
+        <li><a href="/documentation/programming-guide/#sizing-and-progress">Sizing and progress</a></li>
+        <li><a href="/documentation/programming-guide/#user-initiated-checkpoint">User-initiated checkpoint</a></li>
+        <li><a href="/documentation/programming-guide/#runner-initiated-split">Runner initiated split</a></li>
+        <li><a href="/documentation/programming-guide/#watermark-estimation">Watermark estimation</a></li>
+        <li><a href="/documentation/programming-guide/#truncating-during-drain">Truncating during drain</a></li>
+        <li><a href="/documentation/programming-guide/#bundle-finalization">Bundle finalization</a></li>
+      </ul>
+    </li>
 
-          <ul class="section-nav-list">
-            <li><a href="/documentation/programming-guide/#pcollections">Creating a PCollection</a></li>
-            <li><a href="/documentation/programming-guide/#pcollection-characteristics">PCollection characteristics</a></li>
-          </ul>
-        </li>
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">Transforms</span>
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">Multi-language Pipelines</span>
 
-          <ul class="section-nav-list">
-            <li><a href="/documentation/programming-guide/#applying-transforms">Applying transforms</a></li>
-            <li>
-              <span class="section-nav-list-title">Core Beam transforms</span>
-
-              <ul class="section-nav-list">
-                <li><a href="/documentation/programming-guide/#pardo">ParDo</a></li>
-                <li><a href="/documentation/programming-guide/#groupbykey">GroupByKey</a></li>
-                <li><a href="/documentation/programming-guide/#cogroupbykey">CoGroupByKey</a></li>
-                <li><a href="/documentation/programming-guide/#combine">Combine</a></li>
-                <li><a href="/documentation/programming-guide/#flatten">Flatten</a></li>
-                <li><a href="/documentation/programming-guide/#partition">Partition</a></li>
-              </ul>
-            </li>
-
-            <li><a href="/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms">Requirements for user code</a></li>
-            <li><a href="/documentation/programming-guide/#side-inputs">Side inputs</a></li>
-            <li><a href="/documentation/programming-guide/#additional-outputs">Additional outputs</a></li>
-            <li><a href="/documentation/programming-guide/#composite-transforms">Composite transforms</a></li>
-          </ul>
-        </li>
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">Pipeline I/O</span>
-
-          <ul class="section-nav-list">
-            <li><a href="/documentation/programming-guide/#pipeline-io">Using I/O transforms</a></li>
-            <li><a href="/documentation/io/built-in/">Built-in I/O connectors</a></li>
-
-            <li class="section-nav-item--collapsible">
-              <span class="section-nav-list-title">Built-in I/O connector guides</span>
-              <ul class="section-nav-list">
-                <li><a href="/documentation/io/built-in/parquet/">Apache Parquet I/O connector
-                </a></li>
-                <li><a href="/documentation/io/built-in/hadoop/">Hadoop Input/Output Format IO</a></li>
-                <li><a href="/documentation/io/built-in/hcatalog/">HCatalog IO</a></li>
-                <li><a href="/documentation/io/built-in/google-bigquery/">Google BigQuery I/O connector</a></li>
-                <li><a href="/documentation/io/built-in/snowflake/">Snowflake I/O connector</a></li>
-              </ul>
-          </li>
-
-
-            <li class="section-nav-item--collapsible">
-              <span class="section-nav-list-title">Developing new I/O connectors</span>
-
-              <ul class="section-nav-list">
-              <li><a href="/documentation/io/developing-io-overview/">Overview: Developing connectors</a></li>
-              <li><a href="/documentation/io/developing-io-java/">Developing connectors (Java)</a></li>
-              <li><a href="/documentation/io/developing-io-python/">Developing connectors (Python)</a></li>
-              </ul>
-            </li>
-
-            <li><a href="/documentation/io/testing/">Testing I/O transforms</a></li>
-          </ul>
-        </li>
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">Schemas</span>
-
-          <ul class="section-nav-list">
-            <li><a href="/documentation/programming-guide/#what-is-a-schema">What is a schema</a></li>
-            <li><a href="/documentation/programming-guide/#schemas-for-pl-types">Schemas for programming language types</a></li>
-            <li><a href="/documentation/programming-guide/#schema-definition">Schema definition</a></li>
-            <li><a href="/documentation/programming-guide/#logical-types">Logical types</a></li>
-            <li><a href="/documentation/programming-guide/#creating-schemas">Creating schemas</a></li>
-            <li><a href="/documentation/programming-guide/#using-schemas">Using schemas</a></li>
-          </ul>
-        </li>
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">Data encoding and type safety</span>
-
-          <ul class="section-nav-list">
-            <li><a href="/documentation/programming-guide/#data-encoding-and-type-safety">Data encoding basics</a></li>
-            <li><a href="/documentation/programming-guide/#specifying-coders">Specifying coders</a></li>
-            <li><a href="/documentation/programming-guide/#default-coders-and-the-coderregistry">Default coders and the CoderRegistry</a></li>
-          </ul>
-        </li>
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">Windowing</span>
-
-          <ul class="section-nav-list">
-            <li><a href="/documentation/programming-guide/#windowing">Windowing basics</a></li>
-            <li><a href="/documentation/programming-guide/#provided-windowing-functions">Provided windowing functions</a></li>
-            <li><a href="/documentation/programming-guide/#setting-your-pcollections-windowing-function">Setting your PCollection’s windowing function</a></li>
-            <li><a href="/documentation/programming-guide/#watermarks-and-late-data">Watermarks and late data</a></li>
-            <li><a href="/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements">Adding timestamps to a PCollection’s elements</a></li>
-          </ul>
-        </li>
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">Triggers</span>
-
-          <ul class="section-nav-list">
-            <li><a href="/documentation/programming-guide/#triggers">Trigger basics</a></li>
-            <li><a href="/documentation/programming-guide/#event-time-triggers">Event time triggers and the default trigger</a></li>
-            <li><a href="/documentation/programming-guide/#processing-time-triggers">Processing time triggers</a></li>
-            <li><a href="/documentation/programming-guide/#data-driven-triggers">Data-driven triggers</a></li>
-            <li><a href="/documentation/programming-guide/#setting-a-trigger">Setting a trigger</a></li>
-            <li><a href="/documentation/programming-guide/#composite-triggers">Composite triggers</a></li>
-          </ul>
-        </li>
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">Metrics</span>
-
-          <ul class="section-nav-list">
-            <li><a href="/documentation/programming-guide/#metrics">Metrics basics</a></li>
-            <li><a href="/documentation/programming-guide/#types-of-metrics">Types of metrics</a></li>
-            <li><a href="/documentation/programming-guide/#querying-metrics">Querying metrics</a></li>
-            <li><a href="/documentation/programming-guide/#using-metrics">Using metrics in pipeline</a></li>
-            <li><a href="/documentation/programming-guide/#export-metrics">Export metrics</a></li>
-          </ul>
-        </li>
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">State and Timers</span>
-
-          <ul class="section-nav-list">
-            <li><a href="/documentation/programming-guide/#types-of-state">Types of state</a></li>
-            <li><a href="/documentation/programming-guide/#deferred-state-reads">Deferred state reads</a></li>
-            <li><a href="/documentation/programming-guide/#timers">Timers</a></li>
-            <li><a href="/documentation/programming-guide/#garbage-collecting-state">Garbage collecting state</a></li>
-            <li><a href="/documentation/programming-guide/#state-timers-examples">State and timers examples</a></li>
-          </ul>
-        </li>
-
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">Splittable DoFns</span>
-
-          <ul class="section-nav-list">
-            <li><a href="/documentation/programming-guide/#sdf-basics">Basics</a></li>
-            <li><a href="/documentation/programming-guide/#sizing-and-progress">Sizing and progress</a></li>
-            <li><a href="/documentation/programming-guide/#user-initiated-checkpoint">User-initiated checkpoint</a></li>
-            <li><a href="/documentation/programming-guide/#runner-initiated-split">Runner initiated split</a></li>
-            <li><a href="/documentation/programming-guide/#watermark-estimation">Watermark estimation</a></li>
-            <li><a href="/documentation/programming-guide/#truncating-during-drain">Truncating during drain</a></li>
-            <li><a href="/documentation/programming-guide/#bundle-finalization">Bundle finalization</a></li>
-          </ul>
-        </li>
-
-        <li class="section-nav-item--collapsible">
-          <span class="section-nav-list-title">Multi-language Pipelines</span>
-
-          <ul class="section-nav-list">
-            <li><a href="/documentation/programming-guide/#create-x-lang-transforms">Creating cross-language transforms</a></li>
-            <li><a href="/documentation/programming-guide/#use-x-lang-transforms">Using cross-language transforms</a></li>
-            <li><a href="/documentation/programming-guide/#x-lang-transform-runner-support">Runner Support</a></li>
-          </ul>
-        </li>
+      <ul class="section-nav-list">
+        <li><a href="/documentation/programming-guide/#create-x-lang-transforms">Creating cross-language transforms</a></li>
+        <li><a href="/documentation/programming-guide/#use-x-lang-transforms">Using cross-language transforms</a></li>
+        <li><a href="/documentation/programming-guide/#x-lang-transform-runner-support">Runner Support</a></li>
       </ul>
     </li>
   </ul>
 </li>
+<li class="section-nav-item--collapsible">
+  <span class="section-nav-list-title">Pipeline development lifecycle</span>
+  <ul class="section-nav-list">
+    <li><a href="/documentation/pipelines/design-your-pipeline/">Design Your Pipeline</a></li>
+    <li><a href="/documentation/pipelines/create-your-pipeline/">Create Your Pipeline</a></li>
+    <li><a href="/documentation/pipelines/test-your-pipeline/">Test Your Pipeline</a></li>
+  </ul>
+</li>
+<li class="section-nav-item--collapsible">
+  <span class="section-nav-list-title">Common pipeline patterns</span>
 
-<li><a href="/documentation/glossary/">Glossary</a></li>
-
+  <ul class="section-nav-list">
+    <li><a href="/documentation/patterns/overview/">Overview</a></li>
+    <li><a href="/documentation/patterns/file-processing/">File processing</a></li>
+    <li><a href="/documentation/patterns/side-inputs/">Side inputs</a></li>
+    <li><a href="/documentation/patterns/pipeline-options/">Pipeline options</a></li>
+    <li><a href="/documentation/patterns/custom-io/">Custom I/O</a></li>
+    <li><a href="/documentation/patterns/custom-windows/">Custom windows</a></li>
+    <li><a href="/documentation/patterns/bigqueryio/">BigQueryIO</a></li>
+    <li><a href="/documentation/patterns/ai-platform/">AI Platform</a></li>
+    <li><a href="/documentation/patterns/schema/">Schema</a></li>
+    <li><a href="/documentation/patterns/bqml/">BigQuery ML</a></li>
+    <li><a href="/documentation/patterns/cross-language/">Cross-language transforms</a></li>
+  </ul>
+</li>
 <li class="section-nav-item--collapsible">
   <span class="section-nav-list-title">Runtime systems</span>
 
@@ -352,5 +216,129 @@
     <li><a href="/documentation/runtime/sdk-harness-config/">SDK Harness Configuration</a></li>
   </ul>
 </li>
+<li class="section-nav-item--collapsible">
+  <span class="section-nav-list-title">Transform catalog</span>
 
-<li><a href="https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam">Beam Wiki</a></li>
+  <ul class="section-nav-list">
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">Python</span>
+
+      <ul class="section-nav-list">
+        <li><a href="/documentation/transforms/python/overview/">Overview</a></li>
+        <li class="section-nav-item--collapsible">
+          <span class="section-nav-list-title">Element-wise</span>
+
+          <ul class="section-nav-list">
+            <li><a href="/documentation/transforms/python/elementwise/filter/">Filter</a></li>
+            <li><a href="/documentation/transforms/python/elementwise/flatmap/">FlatMap</a></li>
+            <li><a href="/documentation/transforms/python/elementwise/keys/">Keys</a></li>
+            <li><a href="/documentation/transforms/python/elementwise/kvswap/">KvSwap</a></li>
+            <li><a href="/documentation/transforms/python/elementwise/map/">Map</a></li>
+            <li><a href="/documentation/transforms/python/elementwise/pardo/">ParDo</a></li>
+            <li><a href="/documentation/transforms/python/elementwise/partition/">Partition</a></li>
+            <li><a href="/documentation/transforms/python/elementwise/regex/">Regex</a></li>
+            <li><a href="/documentation/transforms/python/elementwise/reify/">Reify</a></li>
+            <li><a href="/documentation/transforms/python/elementwise/tostring/">ToString</a></li>
+            <li><a href="/documentation/transforms/python/elementwise/values/">Values</a></li>
+            <li><a href="/documentation/transforms/python/elementwise/withtimestamps/">WithTimestamps</a></li>
+          </ul>
+        </li>
+        <li class="section-nav-item--collapsible">
+          <span class="section-nav-list-title">Aggregation</span>
+
+          <ul class="section-nav-list">
+            <li><a href="/documentation/transforms/python/aggregation/cogroupbykey/">CoGroupByKey</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/combineglobally/">CombineGlobally</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/combineperkey/">CombinePerKey</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/combinevalues/">CombineValues</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/count/">Count</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/distinct/">Distinct</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/groupbykey/">GroupByKey</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/groupby/">GroupBy</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/groupintobatches/">GroupIntoBatches</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/latest/">Latest</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/max/">Max</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/min/">Min</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/mean/">Mean</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/sample/">Sample</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/sum/">Sum</a></li>
+            <li><a href="/documentation/transforms/python/aggregation/top/">Top</a></li>
+          </ul>
+        </li>
+        <li class="section-nav-item--collapsible">
+          <span class="section-nav-list-title">Other</span>
+
+          <ul class="section-nav-list">
+            <li><a href="/documentation/transforms/python/other/create/">Create</a></li>
+            <li><a href="/documentation/transforms/python/other/flatten/">Flatten</a></li>
+            <li><a href="/documentation/transforms/python/other/reshuffle/">Reshuffle</a></li>
+            <li><a href="/documentation/transforms/python/other/windowinto/">WindowInto</a></li>
+          </ul>
+        </li>
+      </ul>
+    </li>
+
+    <li class="section-nav-item--collapsible">
+      <span class="section-nav-list-title">Java</span>
+
+      <ul class="section-nav-list">
+        <li><a href="/documentation/transforms/java/overview/">Overview</a></li>
+        <li class="section-nav-item--collapsible">
+          <span class="section-nav-list-title">Element-wise</span>
+
+          <ul class="section-nav-list">
+            <li><a href="/documentation/transforms/java/elementwise/filter/">Filter</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/flatmapelements/">FlatMapElements</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/keys/">Keys</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/kvswap/">KvSwap</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/mapelements/">MapElements</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/pardo/">ParDo</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/partition/">Partition</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/regex/">Regex</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/reify/">Reify</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/values/">Values</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/tostring/">ToString</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/withkeys/">WithKeys</a></li>
+            <li><a href="/documentation/transforms/java/elementwise/withtimestamps/">WithTimestamps</a></li>
+          </ul>
+        </li>
+        <li class="section-nav-item--collapsible">
+          <span class="section-nav-list-title">Aggregation</span>
+
+          <ul class="section-nav-list">
+            <li><a href="/documentation/transforms/java/aggregation/approximatequantiles/">ApproximateQuantiles</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/approximateunique/">ApproximateUnique</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/cogroupbykey/">CoGroupByKey</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/combine/">Combine</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/combinewithcontext/">CombineWithContext</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/count/">Count</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/distinct/">Distinct</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/groupbykey/">GroupByKey</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/groupintobatches/">GroupIntoBatches</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/hllcount/">HllCount</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/latest/">Latest</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/max/">Max</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/mean/">Mean</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/min/">Min</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/sample/">Sample</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/sum/">Sum</a></li>
+            <li><a href="/documentation/transforms/java/aggregation/top/">Top</a></li>
+          </ul>
+        </li>
+        <li class="section-nav-item--collapsible">
+          <span class="section-nav-list-title">Other</span>
+
+          <ul class="section-nav-list">
+            <li><a href="/documentation/transforms/java/other/create/">Create</a></li>
+            <li><a href="/documentation/transforms/java/other/flatten/">Flatten</a></li>
+            <li><a href="/documentation/transforms/java/other/passert/">PAssert</a></li>
+            <li><a href="/documentation/transforms/java/other/view/">View</a></li>
+            <li><a href="/documentation/transforms/java/other/window/">Window</a></li>
+          </ul>
+        </li>
+      </ul>
+    </li>
+  </ul>
+</li>
+<li><a href="/documentation/glossary/">Glossary</a></li>
+<li><a href="https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam">Beam Wiki <img src="/images/external-link-icon.png" width="14" height="14" alt="External link."></a></li>
diff --git a/website/www/site/static/js/language-switch.js b/website/www/site/static/js/language-switch-v2.js
similarity index 100%
rename from website/www/site/static/js/language-switch.js
rename to website/www/site/static/js/language-switch-v2.js