Merge pull request #9999 from ibzib/spark-parallel-test

[BEAM-8294] run Spark portable validates runner tests in parallel
diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md
index 61fd817..bd876d3 100644
--- a/.test-infra/jenkins/README.md
+++ b/.test-infra/jenkins/README.md
@@ -27,17 +27,16 @@
 
 | Name | Link | PR Trigger Phrase | Cron Status |
 |------|------|-------------------|-------------|
-| beam_PreCommit_Go | [commit](https://builds.apache.org/job/beam_PreCommit_Go_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Go_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Go_Phrase/) | `Run Go PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron) |
-| beam_PreCommit_JavaPortabilityApi | [commit](), [cron](), [phrase]() | `Run JavaPortabilityApi PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go) |
+| beam_PreCommit_BeamSQL_ZetaSQL | [commit](https://builds.apache.org/job/beam_PreCommit_JavaBeamZetaSQL_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_JavaBeamZetaSQL_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_JavaBeamZetaSQL_Phrase/) | `Run BeamSQL_ZetaSQL PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_JavaBeamZetaSQL_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_JavaBeamZetaSQL_Cron) |
 | beam_PreCommit_CommunityMetrics | [commit](https://builds.apache.org/job/beam_PreCommit_CommunityMetrics_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_CommunityMetrics_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_CommunityMetrics_Phrase/) | `Run CommunityMetrics PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_CommunityMetrics_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_CommunityMetrics_Cron) |
 | beam_PreCommit_Go | [commit](https://builds.apache.org/job/beam_PreCommit_Go_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Go_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Go_Phrase/) | `Run Go PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron) |
 | beam_PreCommit_Java | [commit](https://builds.apache.org/job/beam_PreCommit_Java_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Java_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Java_Phrase/) | `Run Java PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron) |
 | beam_PreCommit_JavaPortabilityApi | [commit](https://builds.apache.org/job/beam_PreCommit_JavaPortabilityApi_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_JavaPortabilityApi_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_JavaPortabilityApi_Phrase/) | `Run JavaPortabilityApi PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_JavaPortabilityApi_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_JavaPortabilityApi_Cron) |
-| beam_PreCommit_Java_Examples_Dataflow | [commit](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Phrase/) | `Run Java PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron) |
-| beam_PreCommit_Portable_Python | [commit](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Phrase/) | `Run Portable PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron) |
+| beam_PreCommit_Java_Examples_Dataflow | [commit](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Phrase/) | `Run Java_Examples_Dataflow PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron) |
+| beam_PreCommit_Portable_Python | [commit](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Phrase/) | `Run Portable_Python PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron) |
 | beam_PreCommit_PythonLint | [commit](https://builds.apache.org/job/beam_PreCommit_PythonLint_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_PythonLint_Phrase/) | `Run PythonLint PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron) |
 | beam_PreCommit_Python | [commit](https://builds.apache.org/job/beam_PreCommit_Python_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Python_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Python_Phrase/) | `Run Python PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron) |
-| beam_PreCommit_Python2_PVR_Flink | [commit](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Phrase/) | `Run Python PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron) |
+| beam_PreCommit_Python2_PVR_Flink | [commit](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Phrase/) | `Run Python2_PVR_Flink PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron) |
 | beam_PreCommit_RAT | [commit](https://builds.apache.org/job/beam_PreCommit_RAT_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_RAT_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_RAT_Phrase/) | `Run RAT PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_RAT_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_RAT_Cron) |
 | beam_PreCommit_Spotless | [commit](https://builds.apache.org/job/beam_PreCommit_Spotless_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Spotless_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Spotless_Phrase/) | `Run Spotless PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Spotless_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Spotless_Cron) |
 | beam_PreCommit_Website | [commit](https://builds.apache.org/job/beam_PreCommit_Website_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Website_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Website_Phrase/) | `Run Website PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron) |
@@ -47,6 +46,7 @@
 
 | Name | Link | PR Trigger Phrase | Cron Status |
 |------|------|-------------------|-------------|
+| beam_PostCommit_CrossLanguageValidatesRunner | [cron](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/), [phrase](https://builds.apache.org/job/beam_PostCommit_XVR_Flink_PR/) | `Run XVR_Flink PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink) |
 | beam_PostCommit_Go | [cron](https://builds.apache.org/job/beam_PostCommit_Go/), [phrase](https://builds.apache.org/job/beam_PostCommit_Go_PR/) | `Run Go PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go) |
 | beam_PostCommit_Go_VR_Flink | [cron](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/), [phrase](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink_PR/) | `Run Go Flink ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/) |
 | beam_PostCommit_Go_VR_Spark | [cron](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/), [phrase](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark_PR/) | `Run Go Spark ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/) |
@@ -59,19 +59,23 @@
 | beam_PostCommit_Java_PVR_Flink_Streaming | [cron](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming_PR/) | `Run Java Flink PortableValidatesRunner Streaming` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming) |
 | beam_PostCommit_Java_PVR_Spark_Batch | [cron](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch_PR/) | `Run Java Spark PortableValidatesRunner Batch` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch) |
 | beam_PostCommit_Java_PortabilityApi | [cron](https://builds.apache.org/job/beam_PostCommit_Java_PortabilityApi/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_PortabilityApi_PR/) | `Run Java PortabilityApi PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PortabilityApi/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PortabilityApi) |
-| beam_PostCommit_Java_ValidatesRunner_Apex | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_PR/) | `Run Apex ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex) |
-| beam_PostCommit_Java_ValidatesRunner_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/) | `Run Dataflow ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow) |
+| beam_PostCommit_Java11_Dataflow_Examples | [cron](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow_PR/) | `Run Java examples on Dataflow with Java 11` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow) |
+| beam_PostCommit_Java11_Dataflow_Portability_Examples | [cron](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow_Portability/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow_Portability_PR/) | `Run Java Portability examples on Dataflow with Java 11` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow_Portability/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow_Portability) |
 | beam_PostCommit_Java11_ValidatesRunner_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Dataflow_PR/) | `Run Dataflow ValidatesRunner Java 11` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Dataflow) |
 | beam_PostCommit_Java11_ValidatesRunner_PortabilityApi_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_PortabilityApi_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_PortabilityApi_Dataflow_PR/) | `Run Dataflow PortabilityApi ValidatesRunner with Java 11` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_PortabilityApi_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_PortabilityApi_Dataflow) |
 | beam_PostCommit_Java11_ValidatesRunner_Direct | [cron](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Direct), [phrase](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Direct_PR) | `Run Direct ValidatesRunner in Java 11` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Direct/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Direct) |
+| beam_PostCommit_Java_ValidatesRunner_Apex | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_PR/) | `Run Apex ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex) |
+| beam_PostCommit_Java_ValidatesRunner_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/) | `Run Dataflow ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow) |
 | beam_PostCommit_Java_ValidatesRunner_Flink | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_PR/) | `Run Flink ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink) |
 | beam_PostCommit_Java_ValidatesRunner_Gearpump | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_PR/) | `Run Gearpump ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump) |
 | beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/) | `Run Dataflow PortabilityApi ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow) |
 | beam_PostCommit_Java_ValidatesRunner_Samza | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_PR/) | `Run Samza ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza) |
 | beam_PostCommit_Java_ValidatesRunner_Spark | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_PR/) | `Run Spark ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark) |
+| beam_PostCommit_PortableJar_Flink | [cron](https://builds.apache.org/job/beam_PostCommit_PortableJar_Flink/), [phrase](https://builds.apache.org/job/beam_PostCommit_PortableJar_Flink_PR/) | `Run PortableJar_Flink PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_PortableJar_Flink/badge/icon)](https://builds.apache.org/job/beam_PostCommit_PortableJar_Flink) |
 | beam_PostCommit_Py_VR_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_PR/) | `Run Python Dataflow ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow) |
 | beam_PostCommit_Py_ValCont | [cron](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/), [phrase](https://builds.apache.org/job/beam_PostCommit_Py_ValCont_PR/) | `Run Python Dataflow ValidatesContainer` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont) |
 | beam_PostCommit_Python35_VR_Flink | [cron](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/), [phrase](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/) | `Run Python 3.5 Flink ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink) |
+| beam_PostCommit_Python_MongoDBIO_IT | [cron](https://builds.apache.org/job/beam_PostCommit_Python_MongoDBIO_IT), [phrase](https://builds.apache.org/job/beam_PostCommit_Python_MongoDBIO_IT_PR/) | `Run Python MongoDBIO_IT` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_MongoDBIO_IT/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_MongoDBIO_IT) |
 | beam_PostCommit_Python_VR_Spark | [cron](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/), [phrase](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/) | `Run Python Spark ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark) |
 | beam_PostCommit_Python2  | [cron](https://builds.apache.org/job/beam_PostCommit_Python2), [phrase](https://builds.apache.org/job/beam_PostCommit_Python2_PR/) | `Run Python 2 PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2) |
 | beam_PostCommit_Python35 | [cron](https://builds.apache.org/job/beam_PostCommit_Python35), [phrase](https://builds.apache.org/job/beam_PostCommit_Python35_PR/) | `Run Python 3.5 PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35) |
@@ -79,6 +83,7 @@
 | beam_PostCommit_Python37 | [cron](https://builds.apache.org/job/beam_PostCommit_Python37), [phrase](https://builds.apache.org/job/beam_PostCommit_Python37_PR/) | `Run Python 3.7 PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37) |
 | beam_PostCommit_SQL | [cron](https://builds.apache.org/job/beam_PostCommit_SQL/), [phrase](https://builds.apache.org/job/beam_PostCommit_SQL_PR/) | `Run SQL PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_SQL/badge/icon)](https://builds.apache.org/job/beam_PostCommit_SQL) |
 | beam_PostCommit_Website_Publish | [cron](https://builds.apache.org/job/beam_PostCommit_Website_Publish/) | N/A | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Website_Publish/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Website_Publish) |
+| beam_PostCommit_Website_Test | [cron](https://builds.apache.org/job/beam_PostCommit_Website_Test/) | `Run Full Website Test` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Website_Test/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Website_Test) |
 
 ### Performance Jobs
 
@@ -89,10 +94,12 @@
 | beam_PerformanceTests_Compressed_TextIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT/), [hdfs_cron](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT_HDFS/) | `Run Java CompressedTextIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT_HDFS/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT_HDFS) |
 | beam_PerformanceTests_HadoopFormat | [cron](https://builds.apache.org/job/beam_PerformanceTests_HadoopFormat/) | `Run Java HadoopFormatIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_HadoopFormat/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_HadoopFormat) |
 | beam_PerformanceTests_JDBC | [cron](https://builds.apache.org/job/beam_PerformanceTests_JDBC/) | `Run Java JdbcIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_JDBC/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_JDBC) |
+| beam_PerformanceTests_KafkaIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_Kafka_IO/) | `Run Java KafkaIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Kafka_IO/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Kafka_IO) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Kafka_IO/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Kafka_IO) |
 | beam_PerformanceTests_ManyFiles_TextIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT/), [hdfs_cron](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT_HDFS/) | `Run Java ManyFilesTextIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT_HDFS/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT_HDFS) |
+| beam_PerformanceTests_MongoDBIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_MongoDBIO_IT/) | `Run Java MongoDBIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_MongoDBIO_IT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_MongoDBIO_IT) |
 | beam_PerformanceTests_ParquetIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT/), [hdfs_cron](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS/) | `Run Java ParquetIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS) |
 | beam_PerformanceTests_Spark | [cron](https://builds.apache.org/job/beam_PerformanceTests_Spark/) | `Run Spark Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Spark/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Spark) |
-| beam_PerformanceTests_TFRecordIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT/) | `Run Java JdbcIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT) |
+| beam_PerformanceTests_TFRecordIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT/) | `Run Java TFRecordIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT) |
 | beam_PerformanceTests_TextIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT/), [hdfs_cron](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/) | `Run Java TextIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS) |
 | beam_PerformanceTests_WordCountIT_Py27 | [cron](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py27/) | `Run Python27 WordCountIT Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py27/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py27) |
 | beam_PerformanceTests_WordCountIT_Py35 | [cron](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py35/) | `Run Python35 WordCountIT Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py35/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py35) |
@@ -104,24 +111,25 @@
 
 | Name | Link | PR Trigger Phrase | Cron Status |
 |------|------|-------------------|-------------|
-| beam_LoadTests_Python_coGBK_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch_PR/) | Run Load Tests Python CoGBK Flink Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/) |
-| beam_LoadTests_Java_CoGBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch_PR/) | Run Load Tests Java CoGBK Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/) |
-| beam_LoadTests_Java_CoGBK_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming_PR/) | Run Load Tests Java CoGBK Dataflow Streaming | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/) |
-| beam_LoadTests_Python_CoGBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch_PR/) | Run Load Tests Python CoGBK Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/) |
-| beam_LoadTests_Python_Combine_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch_PR/) | Run Load Tests Python Combine Flink Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/) |
-| beam_LoadTests_Java_Combine_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch_PR/) | Run Load Tests Java Combine Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/) |
-| beam_LoadTests_Java_Combine_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming_PR/) | Run Load Tests Java Combine Dataflow Streaming | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/) |
-| beam_LoadTests_Python_Combine_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch_PR/) | Run Python Load Tests Combine Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/) |
-| beam_LoadTests_Python_GBK_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch_PR/) | Run Load Tests Python GBK Flink Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/) |
-| beam_LoadTests_Java_GBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch_PR/) | Run Load Tests Java GBK Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/) |
-| beam_LoadTests_Java_GBK_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming_PR/) | Run Load Tests Java GBK Dataflow Streaming | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/) |
-| beam_LoadTests_Python_GBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch_PR/) | Run Load Tests Python GBK Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/) |
-| beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch_PR/) | Run Load Tests Python GBK reiterate Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/) |
-| beam_Java_LoadTests_Smoke | [phrase](https://builds.apache.org/job/beam_Java_LoadTests_Smoke_PR/) | Run Java Load Tests Smoke |  |
-| beam_Python_LoadTests_Smoke | [phrase](https://builds.apache.org/job/beam_Python_LoadTests_Smoke_PR/) | Run Python Load Tests Smoke |  |
-| beam_LoadTests_Java_ParDo_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch_PR/) | Run Load Tests Java ParDo Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/) |
-| beam_LoadTests_Java_ParDo_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming_PR/) | Run Load Tests Java ParDo Dataflow Streaming | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/) |
-| beam_LoadTests_Python_ParDo_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch_PR/) | Run Python Load Tests ParDo Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/) |
+| beam_LoadTests_Python_coGBK_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch_PR/) | `Run Load Tests Python CoGBK Flink Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/) |
+| beam_LoadTests_Java_CoGBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch_PR/) | `Run Load Tests Java CoGBK Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/) |
+| beam_LoadTests_Java_CoGBK_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming_PR/) | `Run Load Tests Java CoGBK Dataflow Streaming` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/) |
+| beam_LoadTests_Python_CoGBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch_PR/) | `Run Load Tests Python CoGBK Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/) |
+| beam_LoadTests_Python_Combine_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch_PR/) | `Run Load Tests Python Combine Flink Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/) |
+| beam_LoadTests_Java_Combine_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch_PR/) | `Run Load Tests Java Combine Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/) |
+| beam_LoadTests_Java_Combine_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming_PR/) | `Run Load Tests Java Combine Dataflow Streaming` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/) |
+| beam_LoadTests_Python_Combine_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch_PR/) | `Run Python Load Tests Combine Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/) |
+| beam_LoadTests_Python_GBK_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch_PR/) | `Run Load Tests Python GBK Flink Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/) |
+| beam_LoadTests_Java_GBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch_PR/) | `Run Load Tests Java GBK Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/) |
+| beam_LoadTests_Java_GBK_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming_PR/) | `Run Load Tests Java GBK Dataflow Streaming` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/) |
+| beam_LoadTests_Python_GBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch_PR/) | `Run Load Tests Python GBK Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/) |
+| beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch_PR/) | `Run Load Tests Python GBK reiterate Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/) |
+| beam_Java_LoadTests_Smoke | [phrase](https://builds.apache.org/job/beam_Java_LoadTests_Smoke_PR/) | `Run Java Load Tests Smoke` |  |
+| beam_Python_LoadTests_Smoke | [phrase](https://builds.apache.org/job/beam_Python_LoadTests_Smoke_PR/) | `Run Python Load Tests Smoke` |  |
+| beam_LoadTests_Java_ParDo_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch_PR/) | `Run Load Tests Java ParDo Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/) |
+| beam_LoadTests_Java_ParDo_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming_PR/) | `Run Load Tests Java ParDo Dataflow Streaming` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/) |
+| beam_LoadTests_Python_ParDo_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch_PR/) | `Run Python Load Tests ParDo Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/) |
+| beam_LoadTests_Python_ParDo_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Flink_Batch_PR/) | `Run Python Load Tests ParDo Flink Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Flink_Batch/) |
 
 ### Inventory Jobs
 
@@ -164,4 +172,4 @@
 retest this please
 ```
 
-* Last update (mm/dd/yyyy): 02/12/2019
+* Last update (mm/dd/yyyy): 11/06/2019
diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
index ecae656..9de15ac 100644
--- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
+++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
@@ -97,6 +97,7 @@
   "\u000A": 10
   "\u00c8\u0001": 200
   "\u00e8\u0007": 1000
+  "\u00a9\u0046": 9001
   "\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1
 
 ---
@@ -275,3 +276,25 @@
   "\u007f\u00f0\0\0\0\0\0\0": "Infinity"
   "\u00ff\u00f0\0\0\0\0\0\0": "-Infinity"
   "\u007f\u00f8\0\0\0\0\0\0": "NaN"
+
+---
+
+coder:
+  urn: "beam:coder:row:v1"
+  # str: string, i32: int32, f64: float64, arr: array[string]
+  payload: "\n\t\n\x03str\x1a\x02\x10\x07\n\t\n\x03i32\x1a\x02\x10\x03\n\t\n\x03f64\x1a\x02\x10\x06\n\r\n\x03arr\x1a\x06\x1a\x04\n\x02\x10\x07\x12$4e5e554c-d4c1-4a5d-b5e1-f3293a6b9f05"
+nested: false
+examples:
+    "\u0004\u0000\u0003foo\u00a9\u0046\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a\0\0\0\u0003\u0003foo\u0003bar\u0003baz": {str: "foo", i32: 9001, f64: "0.1", arr: ["foo", "bar", "baz"]}
+
+---
+
+coder:
+  urn: "beam:coder:row:v1"
+  # str: nullable string, i32: nullable int32, f64: nullable float64
+  payload: "\n\x0b\n\x03str\x1a\x04\x08\x01\x10\x07\n\x0b\n\x03i32\x1a\x04\x08\x01\x10\x03\n\x0b\n\x03f64\x1a\x04\x08\x01\x10\x06\x12$b20c6545-57af-4bc8-b2a9-51ace21c7393"
+nested: false
+examples:
+  "\u0003\u0001\u0007": {str: null, i32: null, f64: null}
+  "\u0003\u0001\u0004\u0003foo\u00a9\u0046": {str: "foo", i32: 9001, f64: null}
+  "\u0003\u0000\u0003foo\u00a9\u0046\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a": {str: "foo", i32: 9001, f64: "0.1"}
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index ec05ef0..90f52fc 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -645,6 +645,50 @@
     // Components: Coder for a single element.
     // Experimental.
     STATE_BACKED_ITERABLE = 9 [(beam_urn) = "beam:coder:state_backed_iterable:v1"];
+
+    // Additional Standard Coders
+    // --------------------------
+    // The following coders are not required to be implemented for an SDK or
+    // runner to support the Beam model, but enable users to take advantage of
+    // schema-aware transforms.
+
+    // Encodes a "row", an element with a known schema, defined by an
+    // instance of Schema from schema.proto.
+    //
+    // A row is encoded as the concatenation of:
+    //   - The number of attributes in the schema, encoded with
+    //     beam:coder:varint:v1. This makes it possible to detect certain
+    //     allowed schema changes (appending or removing columns) in
+    //     long-running streaming pipelines.
+    //   - A byte array representing a packed bitset indicating null fields (a
+    //     1 indicating a null) encoded with beam:coder:bytes:v1. The unused
+    //     bits in the last byte must be set to 0. If there are no nulls an
+    //     empty byte array is encoded.
+    //     The two-byte bitset (not including the lenghth-prefix) for the row
+    //     [NULL, 0, 0, 0, NULL, 0, 0, NULL, 0, NULL] would be
+    //     [0b10010001, 0b00000010]
+    //   - An encoding for each non-null field, concatenated together.
+    //
+    // Schema types are mapped to coders as follows:
+    //   AtomicType:
+    //     BYTE:      not yet a standard coder (BEAM-7996)
+    //     INT16:     not yet a standard coder (BEAM-7996)
+    //     INT32:     beam:coder:varint:v1
+    //     INT64:     beam:coder:varint:v1
+    //     FLOAT:     not yet a standard coder (BEAM-7996)
+    //     DOUBLE:    beam:coder:double:v1
+    //     STRING:    beam:coder:string_utf8:v1
+    //     BOOLEAN:   beam:coder:bool:v1
+    //     BYTES:     beam:coder:bytes:v1
+    //   ArrayType:   beam:coder:iterable:v1 (always has a known length)
+    //   MapType:     not yet a standard coder (BEAM-7996)
+    //   RowType:     beam:coder:row:v1
+    //   LogicalType: Uses the coder for its representation.
+    //
+    // The payload for RowCoder is an instance of Schema.
+    // Components: None
+    // Experimental.
+    ROW = 13 [(beam_urn) = "beam:coder:row:v1"];
   }
 }
 
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
index 9c4e232..f2cc8fa 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
@@ -17,16 +17,22 @@
  */
 package org.apache.beam.runners.core.construction;
 
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
 import java.util.Collections;
 import java.util.List;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.InstanceBuilder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 
 /** {@link CoderTranslator} implementations for known coder types. */
@@ -118,6 +124,33 @@
     };
   }
 
+  static CoderTranslator<RowCoder> row() {
+    return new CoderTranslator<RowCoder>() {
+      @Override
+      public List<? extends Coder<?>> getComponents(RowCoder from) {
+        return ImmutableList.of();
+      }
+
+      @Override
+      public byte[] getPayload(RowCoder from) {
+        return SchemaTranslation.schemaToProto(from.getSchema()).toByteArray();
+      }
+
+      @Override
+      public RowCoder fromComponents(List<Coder<?>> components, byte[] payload) {
+        checkArgument(
+            components.isEmpty(), "Expected empty component list, but received: " + components);
+        Schema schema;
+        try {
+          schema = SchemaTranslation.fromProto(SchemaApi.Schema.parseFrom(payload));
+        } catch (InvalidProtocolBufferException e) {
+          throw new RuntimeException("Unable to parse schema for RowCoder: ", e);
+        }
+        return RowCoder.of(schema);
+      }
+    };
+  }
+
   public abstract static class SimpleStructuredCoderTranslator<T extends Coder<?>>
       implements CoderTranslator<T> {
     @Override
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
index 8294fe0..854f523 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
@@ -29,6 +29,7 @@
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -60,6 +61,7 @@
           .put(GlobalWindow.Coder.class, ModelCoders.GLOBAL_WINDOW_CODER_URN)
           .put(FullWindowedValueCoder.class, ModelCoders.WINDOWED_VALUE_CODER_URN)
           .put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN)
+          .put(RowCoder.class, ModelCoders.ROW_CODER_URN)
           .build();
 
   public static final Set<String> WELL_KNOWN_CODER_URNS = BEAM_MODEL_CODER_URNS.values();
@@ -79,6 +81,7 @@
           .put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix())
           .put(FullWindowedValueCoder.class, CoderTranslators.fullWindowedValue())
           .put(DoubleCoder.class, CoderTranslators.atomic(DoubleCoder.class))
+          .put(RowCoder.class, CoderTranslators.row())
           .build();
 
   static {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
index 8d1265c..486e39c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
@@ -54,6 +54,8 @@
 
   public static final String WINDOWED_VALUE_CODER_URN = getUrn(StandardCoders.Enum.WINDOWED_VALUE);
 
+  public static final String ROW_CODER_URN = getUrn(StandardCoders.Enum.ROW);
+
   private static final Set<String> MODEL_CODER_URNS =
       ImmutableSet.of(
           BYTES_CODER_URN,
@@ -67,7 +69,8 @@
           GLOBAL_WINDOW_CODER_URN,
           INTERVAL_WINDOW_CODER_URN,
           WINDOWED_VALUE_CODER_URN,
-          DOUBLE_CODER_URN);
+          DOUBLE_CODER_URN,
+          ROW_CODER_URN);
 
   public static Set<String> urns() {
     return MODEL_CODER_URNS;
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
index dc28b79..a6368aa 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
@@ -41,10 +41,14 @@
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.LogicalTypes;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -60,8 +64,8 @@
 
 /** Tests for {@link CoderTranslation}. */
 public class CoderTranslationTest {
-  private static final Set<StructuredCoder<?>> KNOWN_CODERS =
-      ImmutableSet.<StructuredCoder<?>>builder()
+  private static final Set<Coder<?>> KNOWN_CODERS =
+      ImmutableSet.<Coder<?>>builder()
           .add(ByteArrayCoder.of())
           .add(BooleanCoder.of())
           .add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
@@ -76,6 +80,13 @@
               FullWindowedValueCoder.of(
                   IterableCoder.of(VarLongCoder.of()), IntervalWindowCoder.of()))
           .add(DoubleCoder.of())
+          .add(
+              RowCoder.of(
+                  Schema.of(
+                      Field.of("i16", FieldType.INT16),
+                      Field.of("array", FieldType.array(FieldType.STRING)),
+                      Field.of("map", FieldType.map(FieldType.STRING, FieldType.INT32)),
+                      Field.of("bar", FieldType.logicalType(LogicalTypes.FixedBytes.of(123))))))
           .build();
 
   /**
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
index 1cedd5d..52dddcc 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
@@ -20,6 +20,8 @@
 import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.instanceOf;
@@ -46,8 +48,8 @@
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi.StandardCoders;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
 import org.apache.beam.sdk.coders.BooleanCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.ByteCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
@@ -55,8 +57,10 @@
 import org.apache.beam.sdk.coders.DoubleCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.RowCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -65,6 +69,8 @@
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -99,6 +105,7 @@
           .put(
               getUrn(StandardCoders.Enum.WINDOWED_VALUE),
               WindowedValue.FullWindowedValueCoder.class)
+          .put(getUrn(StandardCoders.Enum.ROW), RowCoder.class)
           .build();
 
   @AutoValue
@@ -107,16 +114,21 @@
 
     abstract List<CommonCoder> getComponents();
 
+    @SuppressWarnings("mutable")
+    abstract byte[] getPayload();
+
     abstract Boolean getNonDeterministic();
 
     @JsonCreator
     static CommonCoder create(
         @JsonProperty("urn") String urn,
         @JsonProperty("components") @Nullable List<CommonCoder> components,
+        @JsonProperty("payload") @Nullable String payload,
         @JsonProperty("non_deterministic") @Nullable Boolean nonDeterministic) {
       return new AutoValue_CommonCoderTest_CommonCoder(
           checkNotNull(urn, "urn"),
           firstNonNull(components, Collections.emptyList()),
+          firstNonNull(payload, "").getBytes(StandardCharsets.ISO_8859_1),
           firstNonNull(nonDeterministic, Boolean.FALSE));
     }
   }
@@ -282,43 +294,90 @@
       return WindowedValue.of(windowValue, timestamp, windows, paneInfo);
     } else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) {
       return Double.parseDouble((String) value);
+    } else if (s.equals(getUrn(StandardCoders.Enum.ROW))) {
+      Schema schema;
+      try {
+        schema = SchemaTranslation.fromProto(SchemaApi.Schema.parseFrom(coderSpec.getPayload()));
+      } catch (InvalidProtocolBufferException e) {
+        throw new RuntimeException("Failed to parse schema payload for row coder", e);
+      }
+
+      return parseField(value, Schema.FieldType.row(schema));
     } else {
       throw new IllegalStateException("Unknown coder URN: " + coderSpec.getUrn());
     }
   }
 
+  private static Object parseField(Object value, Schema.FieldType fieldType) {
+    switch (fieldType.getTypeName()) {
+      case BYTE:
+        return ((Number) value).byteValue();
+      case INT16:
+        return ((Number) value).shortValue();
+      case INT32:
+        return ((Number) value).intValue();
+      case INT64:
+        return ((Number) value).longValue();
+      case FLOAT:
+        return Float.parseFloat((String) value);
+      case DOUBLE:
+        return Double.parseDouble((String) value);
+      case STRING:
+        return (String) value;
+      case BOOLEAN:
+        return (Boolean) value;
+      case BYTES:
+        // extract String as byte[]
+        return ((String) value).getBytes(StandardCharsets.ISO_8859_1);
+      case ARRAY:
+        return ((List<Object>) value)
+            .stream()
+                .map((element) -> parseField(element, fieldType.getCollectionElementType()))
+                .collect(toImmutableList());
+      case MAP:
+        Map<Object, Object> kvMap = (Map<Object, Object>) value;
+        return kvMap.entrySet().stream()
+            .collect(
+                toImmutableMap(
+                    (pair) -> parseField(pair.getKey(), fieldType.getMapKeyType()),
+                    (pair) -> parseField(pair.getValue(), fieldType.getMapValueType())));
+      case ROW:
+        Map<String, Object> rowMap = (Map<String, Object>) value;
+        Schema schema = fieldType.getRowSchema();
+        Row.Builder row = Row.withSchema(schema);
+        for (Schema.Field field : schema.getFields()) {
+          Object element = rowMap.remove(field.getName());
+          if (element != null) {
+            element = parseField(element, field.getType());
+          }
+          row.addValue(element);
+        }
+
+        if (!rowMap.isEmpty()) {
+          throw new IllegalArgumentException(
+              "Value contains keys that are not in the schema: " + rowMap.keySet());
+        }
+
+        return row.build();
+      default: // DECIMAL, DATETIME, LOGICAL_TYPE
+        throw new IllegalArgumentException("Unsupported type name: " + fieldType.getTypeName());
+    }
+  }
+
   private static Coder<?> instantiateCoder(CommonCoder coder) {
     List<Coder<?>> components = new ArrayList<>();
     for (CommonCoder innerCoder : coder.getComponents()) {
       components.add(instantiateCoder(innerCoder));
     }
-    String s = coder.getUrn();
-    if (s.equals(getUrn(StandardCoders.Enum.BYTES))) {
-      return ByteArrayCoder.of();
-    } else if (s.equals(getUrn(StandardCoders.Enum.BOOL))) {
-      return BooleanCoder.of();
-    } else if (s.equals(getUrn(StandardCoders.Enum.STRING_UTF8))) {
-      return StringUtf8Coder.of();
-    } else if (s.equals(getUrn(StandardCoders.Enum.KV))) {
-      return KvCoder.of(components.get(0), components.get(1));
-    } else if (s.equals(getUrn(StandardCoders.Enum.VARINT))) {
-      return VarLongCoder.of();
-    } else if (s.equals(getUrn(StandardCoders.Enum.INTERVAL_WINDOW))) {
-      return IntervalWindowCoder.of();
-    } else if (s.equals(getUrn(StandardCoders.Enum.ITERABLE))) {
-      return IterableCoder.of(components.get(0));
-    } else if (s.equals(getUrn(StandardCoders.Enum.TIMER))) {
-      return Timer.Coder.of(components.get(0));
-    } else if (s.equals(getUrn(StandardCoders.Enum.GLOBAL_WINDOW))) {
-      return GlobalWindow.Coder.INSTANCE;
-    } else if (s.equals(getUrn(StandardCoders.Enum.WINDOWED_VALUE))) {
-      return WindowedValue.FullWindowedValueCoder.of(
-          components.get(0), (Coder<BoundedWindow>) components.get(1));
-    } else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) {
-      return DoubleCoder.of();
-    } else {
-      throw new IllegalStateException("Unknown coder URN: " + coder.getUrn());
-    }
+    Class<? extends Coder> coderType =
+        ModelCoderRegistrar.BEAM_MODEL_CODER_URNS.inverse().get(coder.getUrn());
+    checkNotNull(coderType, "Unknown coder URN: " + coder.getUrn());
+
+    CoderTranslator<?> translator = ModelCoderRegistrar.BEAM_MODEL_CODERS.get(coderType);
+    checkNotNull(
+        translator, "No translator found for common coder class: " + coderType.getSimpleName());
+
+    return translator.fromComponents(components, coder.getPayload());
   }
 
   @Test
@@ -381,6 +440,8 @@
     } else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) {
 
       assertEquals(expectedValue, actualValue);
+    } else if (s.equals(getUrn(StandardCoders.Enum.ROW))) {
+      assertEquals(expectedValue, actualValue);
     } else {
       throw new IllegalStateException("Unknown coder URN: " + coder.getUrn());
     }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index 0ef7260..1224774 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -181,6 +181,9 @@
 	// Encoding: false = 0, true = 1
 	b := make([]byte, 1, 1)
 	if err := ioutilx.ReadNBufUnsafe(r, b); err != nil {
+		if err == io.EOF {
+			return nil, err
+		}
 		return nil, fmt.Errorf("error decoding bool: %v", err)
 	}
 	switch b[0] {
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go b/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
index 4f5d0f1..f366f4d 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
@@ -16,10 +16,11 @@
 package pipelinex
 
 import (
-	"reflect"
 	"testing"
 
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"github.com/golang/protobuf/proto"
+	"github.com/google/go-cmp/cmp"
 )
 
 func TestShallowClonePTransform(t *testing.T) {
@@ -34,7 +35,7 @@
 
 	for _, test := range tests {
 		actual := ShallowClonePTransform(test)
-		if !reflect.DeepEqual(actual, test) {
+		if !cmp.Equal(actual, test, cmp.Comparer(proto.Equal)) {
 			t.Errorf("ShallowClonePCollection(%v) = %v, want id", test, actual)
 		}
 	}
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
index bb814cd..ae32ffc 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
@@ -16,10 +16,11 @@
 package pipelinex
 
 import (
-	"reflect"
 	"testing"
 
 	pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+	"github.com/golang/protobuf/proto"
+	"github.com/google/go-cmp/cmp"
 )
 
 func TestEnsureUniqueName(t *testing.T) {
@@ -54,7 +55,7 @@
 
 	for _, test := range tests {
 		actual := ensureUniqueNames(test.in)
-		if !reflect.DeepEqual(actual, test.exp) {
+		if !cmp.Equal(actual, test.exp, cmp.Comparer(proto.Equal)) {
 			t.Errorf("ensureUniqueName(%v) = %v, want %v", test.in, actual, test.exp)
 		}
 	}
@@ -112,7 +113,7 @@
 
 	for _, test := range tests {
 		actual := computeCompositeInputOutput(test.in)
-		if !reflect.DeepEqual(actual, test.exp) {
+		if !cmp.Equal(actual, test.exp, cmp.Comparer(proto.Equal)) {
 			t.Errorf("coimputeInputOutput(%v) = %v, want %v", test.in, actual, test.exp)
 		}
 	}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index 4a3f11d..16e427f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -276,7 +276,7 @@
     String specNonWildcardPrefix = getNonWildcardPrefix(spec);
     File file = new File(specNonWildcardPrefix);
     return specNonWildcardPrefix.endsWith(File.separator)
-        ? file
+        ? file.getAbsoluteFile()
         : file.getAbsoluteFile().getParentFile();
   }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 4aa8dbf..03b6263 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -824,11 +824,11 @@
    * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into multiple parts to
    * be processed in parallel.
    *
-   * <p>Signature: {@code List<RestrictionT> splitRestriction( InputT element, RestrictionT
-   * restriction);}
+   * <p>Signature: {@code void splitRestriction(InputT element, RestrictionT restriction,
+   * OutputReceiver<RestrictionT> receiver);}
    *
    * <p>Optional: if this method is omitted, the restriction will not be split (equivalent to
-   * defining the method and returning {@code Collections.singletonList(restriction)}).
+   * defining the method and outputting the {@code restriction} unchanged).
    */
   // TODO: Make the InputT parameter optional.
   @Documented
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
index 4100bff..de41818 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
@@ -210,6 +210,22 @@
   }
 
   @Test
+  public void testMatchRelativeWildcardPath() throws Exception {
+    File baseFolder = temporaryFolder.newFolder("A");
+    File expectedFile1 = new File(baseFolder, "file1");
+
+    expectedFile1.createNewFile();
+
+    List<String> expected = ImmutableList.of(expectedFile1.getAbsolutePath());
+
+    System.setProperty("user.dir", temporaryFolder.getRoot().toString());
+    List<MatchResult> matchResults = localFileSystem.match(ImmutableList.of("A/*"));
+    assertThat(
+        toFilenames(matchResults),
+        containsInAnyOrder(expected.toArray(new String[expected.size()])));
+  }
+
+  @Test
   public void testMatchExact() throws Exception {
     List<String> expected = ImmutableList.of(temporaryFolder.newFile("a").toString());
     temporaryFolder.newFile("aa");
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
index 0685644..5f71f86 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -29,6 +29,10 @@
 import static org.junit.Assert.fail;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
@@ -36,6 +40,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.ValueState;
@@ -47,6 +52,7 @@
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -172,7 +178,7 @@
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      validate();
+      validate(CallState.SETUP, CallState.TEARDOWN);
     }
   }
 
@@ -185,7 +191,7 @@
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      validate();
+      validate(CallState.SETUP, CallState.START_BUNDLE, CallState.TEARDOWN);
     }
   }
 
@@ -198,7 +204,8 @@
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      validate();
+      validate(
+          CallState.SETUP, CallState.START_BUNDLE, CallState.PROCESS_ELEMENT, CallState.TEARDOWN);
     }
   }
 
@@ -211,7 +218,12 @@
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      validate();
+      validate(
+          CallState.SETUP,
+          CallState.START_BUNDLE,
+          CallState.PROCESS_ELEMENT,
+          CallState.FINISH_BUNDLE,
+          CallState.TEARDOWN);
     }
   }
 
@@ -224,7 +236,7 @@
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      validate();
+      validate(CallState.SETUP, CallState.TEARDOWN);
     }
   }
 
@@ -237,7 +249,7 @@
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      validate();
+      validate(CallState.SETUP, CallState.START_BUNDLE, CallState.TEARDOWN);
     }
   }
 
@@ -250,11 +262,30 @@
       p.run();
       fail("Pipeline should have failed with an exception");
     } catch (Exception e) {
-      validate();
+      validate(
+          CallState.SETUP, CallState.START_BUNDLE, CallState.PROCESS_ELEMENT, CallState.TEARDOWN);
     }
   }
 
-  private void validate() {
+  @Test
+  @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesParDoLifecycle.class})
+  public void testTeardownCalledAfterExceptionInFinishBundleStateful() {
+    ExceptionThrowingFn fn = new ExceptionThrowingStatefulFn(MethodForException.FINISH_BUNDLE);
+    p.apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3))).apply(ParDo.of(fn));
+    try {
+      p.run();
+      fail("Pipeline should have failed with an exception");
+    } catch (Exception e) {
+      validate(
+          CallState.SETUP,
+          CallState.START_BUNDLE,
+          CallState.PROCESS_ELEMENT,
+          CallState.FINISH_BUNDLE,
+          CallState.TEARDOWN);
+    }
+  }
+
+  private void validate(CallState... requiredCallStates) {
     assertThat(ExceptionThrowingFn.callStateMap, is(not(anEmptyMap())));
     // assert that callStateMap contains only TEARDOWN as a value. Note: We do not expect
     // teardown to be called on fn itself, but on any deserialized instance on which any other
@@ -267,19 +298,15 @@
                     "Function should have been torn down after exception",
                     value.finalState(),
                     is(CallState.TEARDOWN)));
-  }
 
-  @Test
-  @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesParDoLifecycle.class})
-  public void testTeardownCalledAfterExceptionInFinishBundleStateful() {
-    ExceptionThrowingFn fn = new ExceptionThrowingStatefulFn(MethodForException.FINISH_BUNDLE);
-    p.apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3))).apply(ParDo.of(fn));
-    try {
-      p.run();
-      fail("Pipeline should have failed with an exception");
-    } catch (Exception e) {
-      validate();
-    }
+    List<CallState> states = Arrays.stream(requiredCallStates).collect(Collectors.toList());
+    assertThat(
+        "At least one bundle should contain "
+            + states
+            + ", got "
+            + ExceptionThrowingFn.callStateMap.values(),
+        ExceptionThrowingFn.callStateMap.values().stream()
+            .anyMatch(tracker -> tracker.callStateVisited.equals(states)));
   }
 
   @Before
@@ -289,12 +316,15 @@
   }
 
   private static class DelayedCallStateTracker {
-    private CountDownLatch latch;
-    private AtomicReference<CallState> callState;
+    private final CountDownLatch latch;
+    private final AtomicReference<CallState> callState;
+    private final List<CallState> callStateVisited =
+        Collections.synchronizedList(new ArrayList<>());
 
     private DelayedCallStateTracker(CallState setup) {
       latch = new CountDownLatch(1);
       callState = new AtomicReference<>(setup);
+      callStateVisited.add(setup);
     }
 
     DelayedCallStateTracker update(CallState val) {
@@ -306,13 +336,21 @@
       if (CallState.TEARDOWN == val) {
         latch.countDown();
       }
-
+      synchronized (callStateVisited) {
+        if (!callStateVisited.contains(val)) {
+          callStateVisited.add(val);
+        }
+      }
       return this;
     }
 
     @Override
     public String toString() {
-      return "DelayedCallStateTracker{" + "latch=" + latch + ", callState=" + callState + '}';
+      return MoreObjects.toStringHelper(this)
+          .add("latch", latch)
+          .add("callState", callState)
+          .add("callStateVisited", callStateVisited)
+          .toString();
     }
 
     CallState callState() {
@@ -377,9 +415,9 @@
     @FinishBundle
     public void postBundle() throws Exception {
       assertThat(
-          "processing bundle should have been called before finish bundle",
+          "processing bundle or start bundle should have been called before finish bundle",
           getCallState(),
-          is(CallState.PROCESS_ELEMENT));
+          anyOf(equalTo(CallState.PROCESS_ELEMENT), equalTo(CallState.START_BUNDLE)));
       updateCallState(CallState.FINISH_BUNDLE);
       throwIfNecessary(MethodForException.FINISH_BUNDLE);
     }
@@ -416,8 +454,8 @@
       return System.identityHashCode(this);
     }
 
-    private void updateCallState(CallState processElement) {
-      callStateMap.get(id()).update(processElement);
+    private void updateCallState(CallState state) {
+      callStateMap.get(id()).update(state);
     }
 
     private CallState getCallState() {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index 94db06d..267199b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -27,7 +27,6 @@
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.adapter.java.AbstractQueryableTable;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.QueryProvider;
@@ -103,8 +102,6 @@
         context.getCluster().traitSetOf(BeamLogicalConvention.INSTANCE),
         relOptTable,
         beamTable,
-        ImmutableList.of(),
-        beamTable.constructFilter(ImmutableList.of()),
         pipelineOptionsMap,
         this);
   }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index 3cd6b55..962cc77 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -72,6 +72,7 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLocalRef;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexProgram;
@@ -244,13 +245,35 @@
   @Override
   public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
     NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
-    return BeamCostModel.FACTORY.makeCost(inputStat.getRowCount(), inputStat.getRate());
+    return BeamCostModel.FACTORY
+        .makeCost(inputStat.getRowCount(), inputStat.getRate())
+        // Increase cost by the small factor of the number of expressions involved in predicate.
+        // Helps favor Calcs with smaller filters.
+        .plus(
+            BeamCostModel.FACTORY
+                .makeTinyCost()
+                .multiplyBy(expressionsInFilter(getProgram().split().right)));
   }
 
   public boolean isInputSortRelAndLimitOnly() {
     return (input instanceof BeamSortRel) && ((BeamSortRel) input).isLimitOnly();
   }
 
+  /**
+   * Recursively count the number of expressions involved in conditions.
+   *
+   * @param filterNodes A list of conditions in a CNF.
+   * @return Number of expressions used by conditions.
+   */
+  private int expressionsInFilter(List<RexNode> filterNodes) {
+    int childSum =
+        filterNodes.stream()
+            .filter(n -> n instanceof RexCall)
+            .mapToInt(n -> expressionsInFilter(((RexCall) n).getOperands()))
+            .sum();
+    return filterNodes.size() + childSum;
+  }
+
   /** {@code CalcFn} is the executor for a {@link BeamCalcRel} step. */
   private static class CalcFn extends DoFn<Row, Row> {
     private final String processElementBlock;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index b1d3f02..f672384 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -25,13 +25,9 @@
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
 import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
-import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
-import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.Row;
@@ -41,7 +37,6 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptTable;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.RelOptTableImpl;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelWriter;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.TableScan;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
@@ -52,33 +47,26 @@
   private final BeamSqlTable beamTable;
   private final BeamCalciteTable calciteTable;
   private final Map<String, String> pipelineOptions;
-  private final List<String> usedFields;
-  private final BeamSqlTableFilter tableFilters;
 
   public BeamIOSourceRel(
       RelOptCluster cluster,
       RelTraitSet traitSet,
       RelOptTable table,
       BeamSqlTable beamTable,
-      List<String> usedFields,
-      BeamSqlTableFilter tableFilters,
       Map<String, String> pipelineOptions,
       BeamCalciteTable calciteTable) {
     super(cluster, traitSet, table);
     this.beamTable = beamTable;
-    this.usedFields = usedFields;
-    this.tableFilters = tableFilters;
     this.calciteTable = calciteTable;
     this.pipelineOptions = pipelineOptions;
   }
 
-  public BeamIOSourceRel copy(
+  public BeamPushDownIOSourceRel createPushDownRel(
       RelDataType newType, List<String> usedFields, BeamSqlTableFilter tableFilters) {
     RelOptTable relOptTable =
         newType == null ? table : ((RelOptTableImpl) getTable()).copy(newType);
-    tableFilters = tableFilters == null ? this.tableFilters : tableFilters;
 
-    return new BeamIOSourceRel(
+    return new BeamPushDownIOSourceRel(
         getCluster(),
         traitSet,
         relOptTable,
@@ -119,22 +107,6 @@
     return new Transform();
   }
 
-  @Override
-  public RelWriter explainTerms(RelWriter pw) {
-    super.explainTerms(pw);
-
-    // This is done to tell Calcite planner that BeamIOSourceRel cannot be simply substituted by
-    //  another BeamIOSourceRel, except for when they carry the same content.
-    if (!usedFields.isEmpty()) {
-      pw.item("usedFields", usedFields.toString());
-    }
-    if (!(tableFilters instanceof DefaultTableFilter)) {
-      pw.item(tableFilters.getClass().getSimpleName(), tableFilters.toString());
-    }
-
-    return pw;
-  }
-
   private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
 
     @Override
@@ -145,14 +117,7 @@
           BeamIOSourceRel.class.getSimpleName(),
           input);
 
-      final PBegin begin = input.getPipeline().begin();
-
-      if (usedFields.isEmpty() && tableFilters instanceof DefaultTableFilter) {
-        return beamTable.buildIOReader(begin);
-      }
-
-      final Schema newBeamSchema = CalciteUtils.toSchema(getRowType());
-      return beamTable.buildIOReader(begin, tableFilters, usedFields).setRowSchema(newBeamSchema);
+      return beamTable.buildIOReader(input.getPipeline().begin());
     }
   }
 
@@ -167,9 +132,7 @@
   @Override
   public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
     NodeStats estimates = BeamSqlRelUtils.getNodeStats(this, mq);
-    return BeamCostModel.FACTORY
-        .makeCost(estimates.getRowCount(), estimates.getRate())
-        .multiplyBy(getRowType().getFieldCount());
+    return BeamCostModel.FACTORY.makeCost(estimates.getRowCount(), estimates.getRate());
   }
 
   public BeamSqlTable getBeamSqlTable() {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamPushDownIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamPushDownIOSourceRel.java
new file mode 100644
index 0000000..7c49acf
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamPushDownIOSourceRel.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptTable;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelWriter;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+
+public class BeamPushDownIOSourceRel extends BeamIOSourceRel {
+  private final List<String> usedFields;
+  private final BeamSqlTableFilter tableFilters;
+
+  public BeamPushDownIOSourceRel(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      RelOptTable table,
+      BeamSqlTable beamTable,
+      List<String> usedFields,
+      BeamSqlTableFilter tableFilters,
+      Map<String, String> pipelineOptions,
+      BeamCalciteTable calciteTable) {
+    super(cluster, traitSet, table, beamTable, pipelineOptions, calciteTable);
+    this.usedFields = usedFields;
+    this.tableFilters = tableFilters;
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    super.explainTerms(pw);
+
+    // This is done to tell Calcite planner that BeamIOSourceRel cannot be simply substituted by
+    //  another BeamIOSourceRel, except for when they carry the same content.
+    if (!usedFields.isEmpty()) {
+      pw.item("usedFields", usedFields.toString());
+    }
+    if (!(tableFilters instanceof DefaultTableFilter)) {
+      pw.item(tableFilters.getClass().getSimpleName(), tableFilters.toString());
+    }
+
+    return pw;
+  }
+
+  @Override
+  public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+    return new Transform();
+  }
+
+  private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+    @Override
+    public PCollection<Row> expand(PCollectionList<Row> input) {
+      checkArgument(
+          input.size() == 0,
+          "Should not have received input for %s: %s",
+          BeamIOSourceRel.class.getSimpleName(),
+          input);
+
+      final PBegin begin = input.getPipeline().begin();
+      final BeamSqlTable beamSqlTable = BeamPushDownIOSourceRel.this.getBeamSqlTable();
+
+      if (usedFields.isEmpty() && tableFilters instanceof DefaultTableFilter) {
+        return beamSqlTable.buildIOReader(begin);
+      }
+
+      final Schema newBeamSchema = CalciteUtils.toSchema(getRowType());
+      return beamSqlTable
+          .buildIOReader(begin, tableFilters, usedFields)
+          .setRowSchema(newBeamSchema);
+    }
+  }
+
+  @Override
+  public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+    return super.beamComputeSelfCost(planner, mq)
+        .multiplyBy((double) 1 / (getRowType().getFieldCount() + 1));
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java
index e20967e..65f2d3d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java
@@ -21,16 +21,19 @@
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamPushDownIOSourceRel;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
 import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
@@ -74,6 +77,10 @@
     final BeamIOSourceRel ioSourceRel = call.rel(1);
     final BeamSqlTable beamSqlTable = ioSourceRel.getBeamSqlTable();
 
+    if (ioSourceRel instanceof BeamPushDownIOSourceRel) {
+      return;
+    }
+
     // Nested rows are not supported at the moment
     for (RelDataTypeField field : ioSourceRel.getRowType().getFieldList()) {
       if (field.getType() instanceof RelRecordType) {
@@ -88,50 +95,62 @@
 
     // When predicate push-down is not supported - all filters are unsupported.
     final BeamSqlTableFilter tableFilter = beamSqlTable.constructFilter(projectFilter.right);
-    if (!beamSqlTable.supportsProjects() && tableFilter instanceof DefaultTableFilter) {
+    if (!beamSqlTable.supportsProjects().isSupported()
+        && tableFilter instanceof DefaultTableFilter) {
       // Either project or filter push-down must be supported by the IO.
       return;
     }
 
-    if (!(tableFilter instanceof DefaultTableFilter) && !beamSqlTable.supportsProjects()) {
-      // TODO(BEAM-8508): add support for standalone filter push-down.
-      // Filter push-down without project push-down is not supported for now.
-      return;
-    }
-
-    // Find all input refs used by projects
-    boolean hasComplexProjects = false;
     Set<String> usedFields = new LinkedHashSet<>();
-    for (RexNode project : projectFilter.left) {
-      findUtilizedInputRefs(calcInputRowType, project, usedFields);
-      if (!hasComplexProjects && project instanceof RexCall) {
-        // Ex: 'SELECT field+10 FROM table'
-        hasComplexProjects = true;
+    if (!(tableFilter instanceof DefaultTableFilter)
+        && !beamSqlTable.supportsProjects().isSupported()) {
+      // When applying standalone filter push-down all fields must be project by an IO.
+      // With a single exception: Calc projects all fields (in the same order) and does nothing
+      // else.
+      usedFields.addAll(calcInputRowType.getFieldNames());
+    } else {
+      // Find all input refs used by projects
+      for (RexNode project : projectFilter.left) {
+        findUtilizedInputRefs(calcInputRowType, project, usedFields);
+      }
+
+      // Find all input refs used by filters
+      for (RexNode filter : tableFilter.getNotSupported()) {
+        findUtilizedInputRefs(calcInputRowType, filter, usedFields);
       }
     }
 
-    // Find all input refs used by filters
-    for (RexNode filter : tableFilter.getNotSupported()) {
-      findUtilizedInputRefs(calcInputRowType, filter, usedFields);
+    if (usedFields.isEmpty()) {
+      // No need to do push-down for queries like this: "select UPPER('hello')".
+      return;
     }
 
-    FieldAccessDescriptor resolved =
-        FieldAccessDescriptor.withFieldNames(usedFields)
-            .withOrderByFieldInsertionOrder()
-            .resolve(beamSqlTable.getSchema());
-    Schema newSchema =
-        SelectHelpers.getOutputSchema(ioSourceRel.getBeamSqlTable().getSchema(), resolved);
-    RelDataType calcInputType =
-        CalciteUtils.toCalciteRowType(newSchema, ioSourceRel.getCluster().getTypeFactory());
+    // Already most optimal case:
+    // Calc contains all unsupported filters.
+    // IO only projects fields utilized by a calc.
+    if (tableFilter.getNotSupported().containsAll(projectFilter.right)
+        && usedFields.containsAll(ioSourceRel.getRowType().getFieldNames())) {
+      return;
+    }
 
-    // Check if the calc can be dropped:
-    // 1. Calc only does projects and renames.
-    //    And
-    // 2. Predicate can be completely pushed-down to IO level.
-    if (isProjectRenameOnlyProgram(program) && tableFilter.getNotSupported().isEmpty()) {
+    FieldAccessDescriptor resolved = FieldAccessDescriptor.withFieldNames(usedFields);
+    if (beamSqlTable.supportsProjects().withFieldReordering()) {
+      // Only needs to be done when field reordering is supported, otherwise IO should project
+      // fields in the same order they are defined in the schema and let Calc do the reordering.
+      resolved = resolved.withOrderByFieldInsertionOrder();
+    }
+    resolved = resolved.resolve(beamSqlTable.getSchema());
+
+    if (canDropCalc(program, beamSqlTable.supportsProjects(), tableFilter)) {
       // Tell the optimizer to not use old IO, since the new one is better.
       call.getPlanner().setImportance(ioSourceRel, 0.0);
-      call.transformTo(ioSourceRel.copy(calc.getRowType(), newSchema.getFieldNames(), tableFilter));
+      call.transformTo(
+          ioSourceRel.createPushDownRel(
+              calc.getRowType(),
+              resolved.getFieldsAccessed().stream()
+                  .map(FieldDescriptor::getFieldName)
+                  .collect(Collectors.toList()),
+              tableFilter));
       return;
     }
 
@@ -139,51 +158,25 @@
     // Calc contains all unsupported filters.
     // IO only projects fields utilised by a calc.
     if (tableFilter.getNotSupported().equals(projectFilter.right)
-        && usedFields.size() == ioSourceRel.getRowType().getFieldCount()) {
+        && usedFields.containsAll(ioSourceRel.getRowType().getFieldNames())) {
       return;
     }
 
-    BeamIOSourceRel newIoSourceRel =
-        ioSourceRel.copy(calcInputType, newSchema.getFieldNames(), tableFilter);
-    RelBuilder relBuilder = call.builder();
-    relBuilder.push(newIoSourceRel);
+    RelNode result =
+        constructNodesWithPushDown(
+            resolved,
+            call.builder(),
+            ioSourceRel,
+            tableFilter,
+            calc.getRowType(),
+            projectFilter.left);
 
-    List<RexNode> newProjects = new ArrayList<>();
-    List<RexNode> newFilter = new ArrayList<>();
-    // Ex: let's say the original fields are (number before each element is the index):
-    // {0:unused1, 1:id, 2:name, 3:unused2},
-    // where only 'id' and 'name' are being used. Then the new calcInputType should be as follows:
-    // {0:id, 1:name}.
-    // A mapping list will contain 2 entries: {0:1, 1:2},
-    // showing how used field names map to the original fields.
-    List<Integer> mapping =
-        resolved.getFieldsAccessed().stream()
-            .map(FieldDescriptor::getFieldId)
-            .collect(Collectors.toList());
-
-    // Map filters to new RexInputRef.
-    for (RexNode filter : tableFilter.getNotSupported()) {
-      newFilter.add(reMapRexNodeToNewInputs(filter, mapping));
-    }
-    // Map projects to new RexInputRef.
-    for (RexNode project : projectFilter.left) {
-      newProjects.add(reMapRexNodeToNewInputs(project, mapping));
-    }
-
-    relBuilder.filter(newFilter);
-    relBuilder.project(
-        newProjects, calc.getRowType().getFieldNames(), true); // Always preserve named projects.
-
-    RelNode result = relBuilder.build();
-
-    if (newFilter.size() < projectFilter.right.size()) {
-      // Smaller Calc programs are indisputably better.
+    if (tableFilter.getNotSupported().size() <= projectFilter.right.size()
+        || usedFields.size() < calcInputRowType.getFieldCount()) {
+      // Smaller Calc programs are indisputably better, as well as IOs with less projected fields.
+      // We can consider something with the same number of filters.
       // Tell the optimizer not to use old Calc and IO.
-      call.getPlanner().setImportance(calc, 0.0);
-      call.getPlanner().setImportance(ioSourceRel, 0.0);
-      call.transformTo(result);
-    } else if (newFilter.size() == projectFilter.right.size()) {
-      // But we can consider something with the same number of filters.
+      call.getPlanner().setImportance(ioSourceRel, 0);
       call.transformTo(result);
     }
   }
@@ -262,19 +255,126 @@
   /**
    * Determine whether a program only performs renames and/or projects. RexProgram#isTrivial is not
    * sufficient in this case, because number of projects does not need to be the same as inputs.
+   * Calc should NOT be dropped in the following cases:<br>
+   * 1. Projected fields are manipulated (ex: 'select field1+10').<br>
+   * 2. When the same field projected more than once.<br>
+   * 3. When an IO does not supports field reordering and projects fields in a different (from
+   * schema) order.
    *
    * @param program A program to check.
+   * @param projectReorderingSupported Whether project push-down supports field reordering.
    * @return True when program performs only projects (w/o any modifications), false otherwise.
    */
   @VisibleForTesting
-  boolean isProjectRenameOnlyProgram(RexProgram program) {
+  boolean isProjectRenameOnlyProgram(RexProgram program, boolean projectReorderingSupported) {
     int fieldCount = program.getInputRowType().getFieldCount();
+    Set<Integer> projectIndex = new HashSet<>();
+    int previousIndex = -1;
     for (RexLocalRef ref : program.getProjectList()) {
-      if (ref.getIndex() >= fieldCount) {
+      int index = ref.getIndex();
+      if (index >= fieldCount // Projected values are InputRefs.
+          || !projectIndex.add(ref.getIndex()) // Each field projected once.
+          || (!projectReorderingSupported && index <= previousIndex)) { // In the same order.
         return false;
       }
+      previousIndex = index;
     }
 
     return true;
   }
+
+  /**
+   * Perform a series of checks to determine whether a Calc can be dropped. Following conditions
+   * need to be met in order for that to happen (logical AND):<br>
+   * 1. Program should do simple projects, project each field once, and project fields in the same
+   * order when field reordering is not supported.<br>
+   * 2. Predicate can be completely pushed-down.<br>
+   * 3. Project push-down is supported by the IO or all fields are projected by a Calc.
+   *
+   * @param program A {@code RexProgram} of a {@code Calc}.
+   * @param projectSupport An enum containing information about IO project push-down capabilities.
+   * @param tableFilter A class containing information about IO predicate push-down capabilities.
+   * @return True when Calc can be dropped, false otherwise.
+   */
+  private boolean canDropCalc(
+      RexProgram program, ProjectSupport projectSupport, BeamSqlTableFilter tableFilter) {
+    RelDataType calcInputRowType = program.getInputRowType();
+
+    // Program should do simple projects, project each field once, and project fields in the same
+    // order when field reordering is not supported.
+    boolean fieldReorderingSupported = projectSupport.withFieldReordering();
+    if (!isProjectRenameOnlyProgram(program, fieldReorderingSupported)) {
+      return false;
+    }
+    // Predicate can be completely pushed-down
+    if (!tableFilter.getNotSupported().isEmpty()) {
+      return false;
+    }
+    // Project push-down is supported by the IO or all fields are projected by a Calc.
+    boolean isProjectSupported = projectSupport.isSupported();
+    boolean allFieldsProjected =
+        program.getProjectList().stream()
+            .map(ref -> program.getInputRowType().getFieldList().get(ref.getIndex()).getName())
+            .collect(Collectors.toList())
+            .equals(calcInputRowType.getFieldNames());
+    return isProjectSupported || allFieldsProjected;
+  }
+
+  /**
+   * Construct a new {@link BeamIOSourceRel} with predicate and/or project pushed-down and a new
+   * {@code Calc} to do field reordering/field duplication/complex projects.
+   *
+   * @param resolved A descriptor of fields used by a {@code Calc}.
+   * @param relBuilder A {@code RelBuilder} for constructing {@code Project} and {@code Filter} Rel
+   *     nodes with operations unsupported by the IO.
+   * @param ioSourceRel Original {@code BeamIOSourceRel} we are attempting to perform push-down for.
+   * @param tableFilter A class containing information about IO predicate push-down capabilities.
+   * @param calcDataType A Calcite output schema of an original {@code Calc}.
+   * @param calcProjects A list of projected {@code RexNode}s by a {@code Calc}.
+   * @return An alternative {@code RelNode} with supported filters/projects pushed-down to IO Rel.
+   */
+  private RelNode constructNodesWithPushDown(
+      FieldAccessDescriptor resolved,
+      RelBuilder relBuilder,
+      BeamIOSourceRel ioSourceRel,
+      BeamSqlTableFilter tableFilter,
+      RelDataType calcDataType,
+      List<RexNode> calcProjects) {
+    Schema newSchema =
+        SelectHelpers.getOutputSchema(ioSourceRel.getBeamSqlTable().getSchema(), resolved);
+    RelDataType calcInputType =
+        CalciteUtils.toCalciteRowType(newSchema, ioSourceRel.getCluster().getTypeFactory());
+
+    BeamIOSourceRel newIoSourceRel =
+        ioSourceRel.createPushDownRel(calcInputType, newSchema.getFieldNames(), tableFilter);
+    relBuilder.push(newIoSourceRel);
+
+    List<RexNode> newProjects = new ArrayList<>();
+    List<RexNode> newFilter = new ArrayList<>();
+    // Ex: let's say the original fields are (number before each element is the index):
+    // {0:unused1, 1:id, 2:name, 3:unused2},
+    // where only 'id' and 'name' are being used. Then the new calcInputType should be as follows:
+    // {0:id, 1:name}.
+    // A mapping list will contain 2 entries: {0:1, 1:2},
+    // showing how used field names map to the original fields.
+    List<Integer> mapping =
+        resolved.getFieldsAccessed().stream()
+            .map(FieldDescriptor::getFieldId)
+            .collect(Collectors.toList());
+
+    // Map filters to new RexInputRef.
+    for (RexNode filter : tableFilter.getNotSupported()) {
+      newFilter.add(reMapRexNodeToNewInputs(filter, mapping));
+    }
+    // Map projects to new RexInputRef.
+    for (RexNode project : calcProjects) {
+      newProjects.add(reMapRexNodeToNewInputs(project, mapping));
+    }
+
+    relBuilder.filter(newFilter);
+    // Force to preserve named projects.
+    relBuilder.project(newProjects, calcDataType.getFieldNames(), true);
+
+    return relBuilder.build();
+  }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BaseBeamTable.java
index fd16ca6..bc276f7 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BaseBeamTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BaseBeamTable.java
@@ -45,7 +45,7 @@
   }
 
   @Override
-  public boolean supportsProjects() {
-    return false;
+  public ProjectSupport supportsProjects() {
+    return ProjectSupport.NONE;
   }
 }
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BeamSqlTable.java
index 125bdd0..be2c205 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BeamSqlTable.java
@@ -42,7 +42,7 @@
   BeamSqlTableFilter constructFilter(List<RexNode> filter);
 
   /** Whether project push-down is supported by the IO API. */
-  boolean supportsProjects();
+  ProjectSupport supportsProjects();
 
   /** Whether this table is bounded (known to be finite) or unbounded (may or may not be finite). */
   PCollection.IsBounded isBounded();
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/ProjectSupport.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/ProjectSupport.java
new file mode 100644
index 0000000..5b83d4c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/ProjectSupport.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta;
+
+public enum ProjectSupport {
+  NONE,
+  WITHOUT_FIELD_REORDERING,
+  WITH_FIELD_REORDERING;
+
+  public boolean isSupported() {
+    return !this.equals(NONE);
+  }
+
+  public boolean withFieldReordering() {
+    return this.equals(WITH_FIELD_REORDERING);
+  }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
index 711f1bf..121eab4 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
@@ -27,6 +27,7 @@
 import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
 import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
 import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
@@ -40,7 +41,6 @@
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.SchemaCoder;
-import org.apache.beam.sdk.schemas.transforms.Select;
 import org.apache.beam.sdk.schemas.utils.SelectHelpers;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -138,12 +138,7 @@
       builder.withSelectedFields(fieldNames);
     }
 
-    return begin
-        .apply("Read Input BQ Rows with push-down", builder)
-        .apply(
-            "ReorderRowFields",
-            Select.fieldAccess(
-                FieldAccessDescriptor.withFieldNames(fieldNames).withOrderByFieldInsertionOrder()));
+    return begin.apply("Read Input BQ Rows with push-down", builder);
   }
 
   @Override
@@ -156,8 +151,10 @@
   }
 
   @Override
-  public boolean supportsProjects() {
-    return method.equals(Method.DIRECT_READ);
+  public ProjectSupport supportsProjects() {
+    return method.equals(Method.DIRECT_READ)
+        ? ProjectSupport.WITHOUT_FIELD_REORDERING
+        : ProjectSupport.NONE;
   }
 
   private TypedRead<Row> getBigQueryReadBuilder(Schema schema) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index 5dae333..fbda05a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -35,6 +35,7 @@
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
 import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
@@ -211,7 +212,7 @@
         }
       }
 
-      // When project push-down is supported.
+      // When project push-down is supported or field reordering is needed.
       if ((options == PushDownOptions.PROJECT || options == PushDownOptions.BOTH)
           && !fieldNames.isEmpty()) {
         result =
@@ -240,8 +241,10 @@
     }
 
     @Override
-    public boolean supportsProjects() {
-      return options == PushDownOptions.BOTH || options == PushDownOptions.PROJECT;
+    public ProjectSupport supportsProjects() {
+      return (options == PushDownOptions.BOTH || options == PushDownOptions.PROJECT)
+          ? ProjectSupport.WITH_FIELD_REORDERING
+          : ProjectSupport.NONE;
     }
 
     @Override
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
index 907389f..37fbc61 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
@@ -139,24 +139,30 @@
 
   @Test
   public void testIsProjectRenameOnlyProgram() {
-    List<Pair<String, Boolean>> tests =
+    List<Pair<Pair<String, Boolean>, Boolean>> tests =
         ImmutableList.of(
-            Pair.of("select id from TEST", true),
-            Pair.of("select * from TEST", true),
-            Pair.of("select id, name from TEST", true),
-            Pair.of("select id+10 from TEST", false),
+            // Selecting fields in a different order is only allowed with project push-down.
+            Pair.of(Pair.of("select unused2, name, id from TEST", true), true),
+            Pair.of(Pair.of("select unused2, name, id from TEST", false), false),
+            Pair.of(Pair.of("select id from TEST", false), true),
+            Pair.of(Pair.of("select * from TEST", false), true),
+            Pair.of(Pair.of("select id, name from TEST", false), true),
+            Pair.of(Pair.of("select id+10 from TEST", false), false),
             // Note that we only care about projects.
-            Pair.of("select id from TEST where name='one'", true));
+            Pair.of(Pair.of("select id from TEST where name='one'", false), true));
 
-    for (Pair<String, Boolean> test : tests) {
-      String sqlQuery = test.left;
+    for (Pair<Pair<String, Boolean>, Boolean> test : tests) {
+      String sqlQuery = test.left.left;
+      boolean projectPushDownSupported = test.left.right;
       boolean expectedAnswer = test.right;
       BeamRelNode basicRel = sqlEnv.parseQuery(sqlQuery);
       assertThat(basicRel, instanceOf(Calc.class));
 
       Calc calc = (Calc) basicRel;
       assertThat(
-          BeamIOPushDownRule.INSTANCE.isProjectRenameOnlyProgram(calc.getProgram()),
+          test.toString(),
+          BeamIOPushDownRule.INSTANCE.isProjectRenameOnlyProgram(
+              calc.getProgram(), projectPushDownSupported),
           equalTo(expectedAnswer));
     }
   }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
index 04b8fb0..9a14cab 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
@@ -40,6 +40,7 @@
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -481,7 +482,14 @@
     BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement);
     PCollection<Row> output = BeamSqlRelUtils.toPCollection(readPipeline, relNode);
 
-    assertThat(relNode, instanceOf(BeamIOSourceRel.class));
+    // Calc is not dropped because BigQuery does not support field reordering yet.
+    assertThat(relNode, instanceOf(BeamCalcRel.class));
+    assertThat(relNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    // IO projects fields in the same order they are defined in the schema.
+    assertThat(
+        relNode.getInput(0).getRowType().getFieldNames(),
+        containsInAnyOrder("c_tinyint", "c_integer", "c_varchar"));
+    // Field reordering is done in a Calc
     assertThat(
         output.getSchema(),
         equalTo(
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java
new file mode 100644
index 0000000..1acb94f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.test;
+
+import static org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PUSH_DOWN_OPTION;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.alibaba.fastjson.JSON;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCalcRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOPushDownRule;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PushDownOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.CalcMergeRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterCalcMergeRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterToCalcRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestTableProviderWithFilterAndProjectPushDown {
+  private static final Schema BASIC_SCHEMA =
+      Schema.builder()
+          .addInt32Field("unused1")
+          .addInt32Field("id")
+          .addStringField("name")
+          .addInt16Field("unused2")
+          .addBooleanField("b")
+          .build();
+  private static final List<RelOptRule> rulesWithPushDown =
+      ImmutableList.of(
+          BeamCalcRule.INSTANCE,
+          FilterCalcMergeRule.INSTANCE,
+          ProjectCalcMergeRule.INSTANCE,
+          BeamIOPushDownRule.INSTANCE,
+          FilterToCalcRule.INSTANCE,
+          ProjectToCalcRule.INSTANCE,
+          CalcMergeRule.INSTANCE);
+  private BeamSqlEnv sqlEnv;
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  @Before
+  public void buildUp() {
+    TestTableProvider tableProvider = new TestTableProvider();
+    Table table = getTable("TEST", PushDownOptions.BOTH);
+    tableProvider.createTable(table);
+    tableProvider.addRows(
+        table.getName(),
+        row(BASIC_SCHEMA, 100, 1, "one", (short) 100, true),
+        row(BASIC_SCHEMA, 200, 2, "two", (short) 200, false));
+
+    sqlEnv =
+        BeamSqlEnv.builder(tableProvider)
+            .setPipelineOptions(PipelineOptionsFactory.create())
+            .setRuleSets(new RuleSet[] {RuleSets.ofList(rulesWithPushDown)})
+            .build();
+  }
+
+  @Test
+  public void testIOSourceRel_predicateSimple() {
+    String selectTableStatement = "SELECT name FROM TEST where id=2";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+    assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_predicateSimple_Boolean() {
+    String selectTableStatement = "SELECT name FROM TEST where b";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+    assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_predicateWithAnd() {
+    String selectTableStatement = "SELECT name FROM TEST where id>=2 and unused1<=200";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+    assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_withComplexProjects_withSupportedFilter() {
+    String selectTableStatement =
+        "SELECT name as new_name, unused1+10-id as new_id FROM TEST where 1<id";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    // Make sure project push-down was done
+    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(a, containsInAnyOrder("name", "unused1", "id"));
+    assertEquals(
+        Schema.builder().addStringField("new_name").addInt32Field("new_id").build(),
+        result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 208));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_selectFieldsInRandomOrder_withRename_withSupportedFilter() {
+    String selectTableStatement =
+        "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where 1<id";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+    // Make sure project push-down was done
+    List<String> a = beamRelNode.getRowType().getFieldNames();
+    assertThat(a, containsInAnyOrder("new_name", "new_id", "new_unused1"));
+    assertEquals(
+        Schema.builder()
+            .addStringField("new_name")
+            .addInt32Field("new_id")
+            .addInt32Field("new_unused1")
+            .build(),
+        result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_selectFieldsInRandomOrder_withRename_withUnsupportedFilter() {
+    String selectTableStatement =
+        "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where id+unused1=202";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    // Make sure project push-down was done
+    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+    assertEquals(
+        Schema.builder()
+            .addStringField("new_name")
+            .addInt32Field("new_id")
+            .addInt32Field("new_unused1")
+            .build(),
+        result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void
+      testIOSourceRel_selectFieldsInRandomOrder_withRename_withSupportedAndUnsupportedFilters() {
+    String selectTableStatement =
+        "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where 1<id and id+unused1=202";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    // Make sure project push-down was done
+    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+    assertEquals(
+        "BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{<(1, $1)}, unsupported{=(+($1, $0), 202)}])",
+        beamRelNode.getInput(0).getDigest());
+    assertEquals(
+        Schema.builder()
+            .addStringField("new_name")
+            .addInt32Field("new_id")
+            .addInt32Field("new_unused1")
+            .build(),
+        result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_selectAllField() {
+    String selectTableStatement = "SELECT * FROM TEST where id<>2";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+    assertEquals(
+        "BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[unused1, id, name, unused2, b],TestTableFilter=[supported{<>($1, 2)}, unsupported{}])",
+        beamRelNode.getDigest());
+    assertEquals(BASIC_SCHEMA, result.getSchema());
+    PAssert.that(result)
+        .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  private static Row row(Schema schema, Object... objects) {
+    return Row.withSchema(schema).addValues(objects).build();
+  }
+
+  @Test
+  public void testIOSourceRel_withUnsupportedPredicate() {
+    String selectTableStatement = "SELECT name FROM TEST where id+unused1=101";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    assertEquals(
+        "BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{}, unsupported{=(+($1, $0), 101)}])",
+        beamRelNode.getInput(0).getDigest());
+    // Make sure project push-down was done
+    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+
+    assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_selectAll_withUnsupportedPredicate() {
+    String selectTableStatement = "SELECT * FROM TEST where id+unused1=101";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    assertEquals(
+        "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST])", beamRelNode.getInput(0).getDigest());
+    // Make sure project push-down was done (all fields since 'select *')
+    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(a, containsInAnyOrder("name", "id", "unused1", "unused2", "b"));
+
+    assertEquals(BASIC_SCHEMA, result.getSchema());
+    PAssert.that(result)
+        .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_withSupportedAndUnsupportedPredicate() {
+    String selectTableStatement = "SELECT name FROM TEST where id+unused1=101 and id=1";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    assertEquals(
+        "BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{=($1, 1)}, unsupported{=(+($1, $0), 101)}])",
+        beamRelNode.getInput(0).getDigest());
+    // Make sure project push-down was done
+    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+
+    assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_selectAll_withSupportedAndUnsupportedPredicate() {
+    String selectTableStatement = "SELECT * FROM TEST where id+unused1=101 and id=1";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    assertEquals(
+        "BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[unused1, id, name, unused2, b],TestTableFilter=[supported{=($1, 1)}, unsupported{=(+($1, $0), 101)}])",
+        beamRelNode.getInput(0).getDigest());
+    // Make sure project push-down was done (all fields since 'select *')
+    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(a, containsInAnyOrder("unused1", "name", "id", "unused2", "b"));
+
+    assertEquals(BASIC_SCHEMA, result.getSchema());
+    PAssert.that(result)
+        .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_selectOneFieldsMoreThanOnce() {
+    String selectTableStatement = "SELECT b, b, b, b, b FROM TEST";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    // Calc must not be dropped
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    // Make sure project push-down was done
+    List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(pushedFields, containsInAnyOrder("b"));
+
+    assertEquals(
+        Schema.builder()
+            .addBooleanField("b")
+            .addBooleanField("b0")
+            .addBooleanField("b1")
+            .addBooleanField("b2")
+            .addBooleanField("b3")
+            .build(),
+        result.getSchema());
+    PAssert.that(result)
+        .containsInAnyOrder(
+            row(result.getSchema(), true, true, true, true, true),
+            row(result.getSchema(), false, false, false, false, false));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_selectOneFieldsMoreThanOnce_withSupportedPredicate() {
+    String selectTableStatement = "SELECT b, b, b, b, b FROM TEST where b";
+
+    // Calc must not be dropped
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    // Supported predicate should be pushed-down
+    assertNull(((BeamCalcRel) beamRelNode).getProgram().getCondition());
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    // Make sure project push-down was done
+    List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(pushedFields, containsInAnyOrder("b"));
+
+    assertEquals(
+        Schema.builder()
+            .addBooleanField("b")
+            .addBooleanField("b0")
+            .addBooleanField("b1")
+            .addBooleanField("b2")
+            .addBooleanField("b3")
+            .build(),
+        result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), true, true, true, true, true));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  private static Table getTable(String name, PushDownOptions options) {
+    return Table.builder()
+        .name(name)
+        .comment(name + " table")
+        .schema(BASIC_SCHEMA)
+        .properties(
+            JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }"))
+        .type("test")
+        .build();
+  }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
index 0b6ead6..e64a103 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
@@ -19,9 +19,11 @@
 
 import static org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PUSH_DOWN_OPTION;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 
 import com.alibaba.fastjson.JSON;
 import java.util.List;
@@ -42,6 +44,7 @@
 import org.apache.beam.sdk.values.Row;
 import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.CalcMergeRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterCalcMergeRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterToCalcRule;
@@ -49,6 +52,7 @@
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
 import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.joda.time.Duration;
 import org.junit.Before;
 import org.junit.Rule;
@@ -82,7 +86,7 @@
   @Before
   public void buildUp() {
     TestTableProvider tableProvider = new TestTableProvider();
-    Table table = getTable("TEST", PushDownOptions.BOTH);
+    Table table = getTable("TEST", PushDownOptions.FILTER);
     tableProvider.createTable(table);
     tableProvider.addRows(
         table.getName(),
@@ -97,13 +101,21 @@
   }
 
   @Test
-  public void testIOSourceRel_predicateSimple() {
-    String selectTableStatement = "SELECT name FROM TEST where id=2";
+  public void testIOSourceRel_withFilter_shouldProjectAllFields() {
+    String selectTableStatement = "SELECT name FROM TEST where name='two'";
 
     BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
     PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
 
-    assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    // Condition should be pushed-down to IO level
+    assertNull(((Calc) beamRelNode).getProgram().getCondition());
+
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    List<String> projects = beamRelNode.getInput(0).getRowType().getFieldNames();
+    // When performing standalone filter push-down IO should project all fields.
+    assertThat(projects, containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
+
     assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
     PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two"));
 
@@ -111,169 +123,69 @@
   }
 
   @Test
-  public void testIOSourceRel_predicateSimple_Boolean() {
-    String selectTableStatement = "SELECT name FROM TEST where b";
+  public void testIOSourceRel_selectAll_withSupportedFilter_shouldDropCalc() {
+    String selectTableStatement = "SELECT * FROM TEST where name='two'";
 
     BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
     PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
 
+    // Calc is dropped, because all fields are projected in the same order and filter is
+    // pushed-down.
     assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
-    assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
-    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
 
-    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
-  }
+    List<String> projects = beamRelNode.getRowType().getFieldNames();
+    assertThat(projects, containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
 
-  @Test
-  public void testIOSourceRel_predicateWithAnd() {
-    String selectTableStatement = "SELECT name FROM TEST where id>=2 and unused1<=200";
-
-    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
-    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
-    assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
-    assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
-    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two"));
-
-    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
-  }
-
-  @Test
-  public void testIOSourceRel_withComplexProjects_withSupportedFilter() {
-    String selectTableStatement =
-        "SELECT name as new_name, unused1+10-id as new_id FROM TEST where 1<id";
-
-    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
-    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
-    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
-    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
-    // Make sure project push-down was done
-    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
-    assertThat(a, containsInAnyOrder("name", "unused1", "id"));
-    assertEquals(
-        Schema.builder().addStringField("new_name").addInt32Field("new_id").build(),
-        result.getSchema());
-    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 208));
-
-    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
-  }
-
-  @Test
-  public void testIOSourceRel_selectFieldsInRandomOrder_withRename_withSupportedFilter() {
-    String selectTableStatement =
-        "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where 1<id";
-
-    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
-    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
-    assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
-    // Make sure project push-down was done
-    List<String> a = beamRelNode.getRowType().getFieldNames();
-    assertThat(a, containsInAnyOrder("new_name", "new_id", "new_unused1"));
-    assertEquals(
-        Schema.builder()
-            .addStringField("new_name")
-            .addInt32Field("new_id")
-            .addInt32Field("new_unused1")
-            .build(),
-        result.getSchema());
-    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
-
-    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
-  }
-
-  @Test
-  public void testIOSourceRel_selectFieldsInRandomOrder_withRename_withUnsupportedFilter() {
-    String selectTableStatement =
-        "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where id+unused1=202";
-
-    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
-    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
-    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
-    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
-    // Make sure project push-down was done
-    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
-    assertThat(a, containsInAnyOrder("name", "id", "unused1"));
-    assertEquals(
-        Schema.builder()
-            .addStringField("new_name")
-            .addInt32Field("new_id")
-            .addInt32Field("new_unused1")
-            .build(),
-        result.getSchema());
-    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
-
-    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
-  }
-
-  @Test
-  public void
-      testIOSourceRel_selectFieldsInRandomOrder_withRename_withSupportedAndUnsupportedFilters() {
-    String selectTableStatement =
-        "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where 1<id and id+unused1=202";
-
-    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
-    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
-    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
-    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
-    // Make sure project push-down was done
-    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
-    assertThat(a, containsInAnyOrder("name", "id", "unused1"));
-    assertEquals(
-        "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{<(1, $1)}, unsupported{=(+($1, $0), 202)}])",
-        beamRelNode.getInput(0).getDigest());
-    assertEquals(
-        Schema.builder()
-            .addStringField("new_name")
-            .addInt32Field("new_id")
-            .addInt32Field("new_unused1")
-            .build(),
-        result.getSchema());
-    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
-
-    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
-  }
-
-  @Test
-  public void testIOSourceRel_selectAllField() {
-    String selectTableStatement = "SELECT * FROM TEST where id<>2";
-
-    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
-    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
-    assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
-    assertEquals(
-        "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[unused1, id, name, unused2, b],TestTableFilter=[supported{<>($1, 2)}, unsupported{}])",
-        beamRelNode.getDigest());
     assertEquals(BASIC_SCHEMA, result.getSchema());
     PAssert.that(result)
-        .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+        .containsInAnyOrder(row(result.getSchema(), 200, 2, "two", (short) 200, false));
 
     pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
   }
 
-  private static Row row(Schema schema, Object... objects) {
-    return Row.withSchema(schema).addValues(objects).build();
-  }
-
   @Test
-  public void testIOSourceRel_withUnsupportedPredicate() {
-    String selectTableStatement = "SELECT name FROM TEST where id+unused1=101";
+  public void testIOSourceRel_withSupportedFilter_selectInRandomOrder() {
+    String selectTableStatement = "SELECT unused2, id, name FROM TEST where b";
 
     BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
     PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
 
     assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    // Condition should be pushed-down to IO level
+    assertNull(((Calc) beamRelNode).getProgram().getCondition());
+
     assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    List<String> projects = beamRelNode.getInput(0).getRowType().getFieldNames();
+    // When performing standalone filter push-down IO should project all fields.
+    assertThat(projects, containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
+
     assertEquals(
-        "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{}, unsupported{=(+($1, $0), 101)}])",
-        beamRelNode.getInput(0).getDigest());
-    // Make sure project push-down was done
-    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
-    assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+        Schema.builder()
+            .addInt16Field("unused2")
+            .addInt32Field("id")
+            .addStringField("name")
+            .build(),
+        result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), (short) 100, 1, "one"));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_withUnsupportedFilter_calcPreservesCondition() {
+    String selectTableStatement = "SELECT name FROM TEST where id+1=2";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    // Unsupported condition should be preserved in a Calc
+    assertNotNull(((Calc) beamRelNode).getProgram().getCondition());
+
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    List<String> projects = beamRelNode.getInput(0).getRowType().getFieldNames();
+    // When performing standalone filter push-down IO should project all fields.
+    assertThat(projects, containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
 
     assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
     PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
@@ -282,69 +194,99 @@
   }
 
   @Test
-  public void testIOSourceRel_selectAll_withUnsupportedPredicate() {
-    String selectTableStatement = "SELECT * FROM TEST where id+unused1=101";
+  public void testIOSourceRel_selectAllFieldsInRandomOrder_shouldPushDownSupportedFilter() {
+    String selectTableStatement = "SELECT unused2, name, id, b, unused1 FROM TEST where name='two'";
 
     BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
     PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
 
+    // Calc should not be dropped, because fields are selected in a different order, even though
+    //  all filters are supported and all fields are projected.
     assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
-    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
-    assertEquals(
-        "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],TestTableFilter=[supported{}, unsupported{}])",
-        beamRelNode.getInput(0).getDigest());
-    // Make sure project push-down was done (all fields since 'select *')
-    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
-    assertThat(a, containsInAnyOrder("name", "id", "unused1", "unused2", "b"));
+    assertNull(((BeamCalcRel) beamRelNode).getProgram().getCondition());
 
-    assertEquals(BASIC_SCHEMA, result.getSchema());
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    List<String> projects = beamRelNode.getInput(0).getRowType().getFieldNames();
+    // When performing standalone filter push-down IO should project all fields.
+    assertThat(projects, containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
+
+    assertEquals(
+        Schema.builder()
+            .addInt16Field("unused2")
+            .addStringField("name")
+            .addInt32Field("id")
+            .addBooleanField("b")
+            .addInt32Field("unused1")
+            .build(),
+        result.getSchema());
     PAssert.that(result)
-        .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+        .containsInAnyOrder(row(result.getSchema(), (short) 200, "two", 2, false, 200));
 
     pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
   }
 
   @Test
-  public void testIOSourceRel_withSupportedAndUnsupportedPredicate() {
-    String selectTableStatement = "SELECT name FROM TEST where id+unused1=101 and id=1";
+  public void testIOSourceRel_selectOneFieldsMoreThanOnce() {
+    String selectTableStatement = "SELECT b, b, b, b, b FROM TEST";
 
     BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
     PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
 
+    // Calc must not be dropped
     assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
     assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
-    assertEquals(
-        "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{=($1, 1)}, unsupported{=(+($1, $0), 101)}])",
-        beamRelNode.getInput(0).getDigest());
     // Make sure project push-down was done
-    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
-    assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+    List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+    // When performing standalone filter push-down IO should project all fields.
+    assertThat(
+        pushedFields,
+        IsIterableContainingInAnyOrder.containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
 
-    assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
-    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
+    assertEquals(
+        Schema.builder()
+            .addBooleanField("b")
+            .addBooleanField("b0")
+            .addBooleanField("b1")
+            .addBooleanField("b2")
+            .addBooleanField("b3")
+            .build(),
+        result.getSchema());
+    PAssert.that(result)
+        .containsInAnyOrder(
+            row(result.getSchema(), true, true, true, true, true),
+            row(result.getSchema(), false, false, false, false, false));
 
     pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
   }
 
   @Test
-  public void testIOSourceRel_selectAll_withSupportedAndUnsupportedPredicate() {
-    String selectTableStatement = "SELECT * FROM TEST where id+unused1=101 and id=1";
+  public void testIOSourceRel_selectOneFieldsMoreThanOnce_withSupportedPredicate() {
+    String selectTableStatement = "SELECT b, b, b, b, b FROM TEST where b";
 
+    // Calc must not be dropped
     BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
     PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
 
     assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    // Supported predicate should be pushed-down
+    assertNull(((BeamCalcRel) beamRelNode).getProgram().getCondition());
     assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
-    assertEquals(
-        "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[unused1, id, name, unused2, b],TestTableFilter=[supported{=($1, 1)}, unsupported{=(+($1, $0), 101)}])",
-        beamRelNode.getInput(0).getDigest());
-    // Make sure project push-down was done (all fields since 'select *')
-    List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
-    assertThat(a, containsInAnyOrder("unused1", "name", "id", "unused2", "b"));
+    // Make sure project push-down was done
+    List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(
+        pushedFields,
+        IsIterableContainingInAnyOrder.containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
 
-    assertEquals(BASIC_SCHEMA, result.getSchema());
-    PAssert.that(result)
-        .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+    assertEquals(
+        Schema.builder()
+            .addBooleanField("b")
+            .addBooleanField("b0")
+            .addBooleanField("b1")
+            .addBooleanField("b2")
+            .addBooleanField("b3")
+            .build(),
+        result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), true, true, true, true, true));
 
     pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
   }
@@ -359,4 +301,8 @@
         .type("test")
         .build();
   }
+
+  private static Row row(Schema schema, Object... objects) {
+    return Row.withSchema(schema).addValues(objects).build();
+  }
 }
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
index d8b6141..363c0f2 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
@@ -22,10 +22,12 @@
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 import com.alibaba.fastjson.JSON;
 import java.util.List;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -180,15 +182,76 @@
         .containsInAnyOrder(
             row(result.getSchema(), 100, 1, "one", 100),
             row(result.getSchema(), 200, 2, "two", 200));
-    assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
     // If project push-down succeeds new BeamIOSourceRel should not output unused fields
     assertThat(
-        beamRelNode.getRowType().getFieldNames(),
+        beamRelNode.getInput(0).getRowType().getFieldNames(),
         containsInAnyOrder("unused1", "id", "name", "unused2"));
 
     pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
   }
 
+  @Test
+  public void testIOSourceRel_selectOneFieldsMoreThanOnce() {
+    String selectTableStatement = "SELECT id, id, id, id, id FROM TEST";
+
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    // Calc must not be dropped
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    // Make sure project push-down was done
+    List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(pushedFields, containsInAnyOrder("id"));
+
+    assertEquals(
+        Schema.builder()
+            .addInt32Field("id")
+            .addInt32Field("id0")
+            .addInt32Field("id1")
+            .addInt32Field("id2")
+            .addInt32Field("id3")
+            .build(),
+        result.getSchema());
+    PAssert.that(result)
+        .containsInAnyOrder(
+            row(result.getSchema(), 1, 1, 1, 1, 1), row(result.getSchema(), 2, 2, 2, 2, 2));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
+  @Test
+  public void testIOSourceRel_selectOneFieldsMoreThanOnce_withSupportedPredicate() {
+    String selectTableStatement = "SELECT id, id, id, id, id FROM TEST where id=1";
+
+    // Calc must not be dropped
+    BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+    PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+    assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+    // Project push-down should leave predicate in a Calc
+    assertNotNull(((BeamCalcRel) beamRelNode).getProgram().getCondition());
+    assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+    // Make sure project push-down was done
+    List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+    assertThat(pushedFields, containsInAnyOrder("id"));
+
+    assertEquals(
+        Schema.builder()
+            .addInt32Field("id")
+            .addInt32Field("id0")
+            .addInt32Field("id1")
+            .addInt32Field("id2")
+            .addInt32Field("id3")
+            .build(),
+        result.getSchema());
+    PAssert.that(result).containsInAnyOrder(row(result.getSchema(), 1, 1, 1, 1, 1));
+
+    pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+  }
+
   private static Row row(Schema schema, Object... objects) {
     return Row.withSchema(schema).addValues(objects).build();
   }
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index 9ea3ff5..668fa3c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -239,6 +239,7 @@
         .setMaxNumRecords(Long.MAX_VALUE)
         .setUpToDateThreshold(Duration.ZERO)
         .setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy())
+        .setMaxCapacityPerShard(ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD)
         .build();
   }
 
@@ -272,6 +273,8 @@
 
     abstract WatermarkPolicyFactory getWatermarkPolicyFactory();
 
+    abstract Integer getMaxCapacityPerShard();
+
     abstract Builder toBuilder();
 
     @AutoValue.Builder
@@ -293,6 +296,8 @@
 
       abstract Builder setWatermarkPolicyFactory(WatermarkPolicyFactory watermarkPolicyFactory);
 
+      abstract Builder setMaxCapacityPerShard(Integer maxCapacity);
+
       abstract Read build();
     }
 
@@ -420,6 +425,12 @@
       return toBuilder().setWatermarkPolicyFactory(watermarkPolicyFactory).build();
     }
 
+    /** Specifies the maximum number of messages per one shard. */
+    public Read withMaxCapacityPerShard(Integer maxCapacity) {
+      checkArgument(maxCapacity > 0, "maxCapacity must be positive, but was: %s", maxCapacity);
+      return toBuilder().setMaxCapacityPerShard(maxCapacity).build();
+    }
+
     @Override
     public PCollection<KinesisRecord> expand(PBegin input) {
       Unbounded<KinesisRecord> unbounded =
@@ -430,7 +441,8 @@
                   getInitialPosition(),
                   getUpToDateThreshold(),
                   getWatermarkPolicyFactory(),
-                  getRequestRecordsLimit()));
+                  getRequestRecordsLimit(),
+                  getMaxCapacityPerShard()));
 
       PTransform<PBegin, PCollection<KinesisRecord>> transform = unbounded;
 
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index db73b99..9e869f5 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -46,20 +46,23 @@
   private long lastBacklogBytes;
   private Instant backlogBytesLastCheckTime = new Instant(0L);
   private ShardReadersPool shardReadersPool;
+  private final Integer maxCapacityPerShard;
 
   KinesisReader(
       SimplifiedKinesisClient kinesis,
       CheckpointGenerator initialCheckpointGenerator,
       KinesisSource source,
       WatermarkPolicyFactory watermarkPolicyFactory,
-      Duration upToDateThreshold) {
+      Duration upToDateThreshold,
+      Integer maxCapacityPerShard) {
     this(
         kinesis,
         initialCheckpointGenerator,
         source,
         watermarkPolicyFactory,
         upToDateThreshold,
-        Duration.standardSeconds(30));
+        Duration.standardSeconds(30),
+        maxCapacityPerShard);
   }
 
   KinesisReader(
@@ -68,7 +71,8 @@
       KinesisSource source,
       WatermarkPolicyFactory watermarkPolicyFactory,
       Duration upToDateThreshold,
-      Duration backlogBytesCheckThreshold) {
+      Duration backlogBytesCheckThreshold,
+      Integer maxCapacityPerShard) {
     this.kinesis = checkNotNull(kinesis, "kinesis");
     this.initialCheckpointGenerator =
         checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
@@ -76,6 +80,7 @@
     this.source = source;
     this.upToDateThreshold = upToDateThreshold;
     this.backlogBytesCheckThreshold = backlogBytesCheckThreshold;
+    this.maxCapacityPerShard = maxCapacityPerShard;
   }
 
   /** Generates initial checkpoint and instantiates iterators for shards. */
@@ -177,6 +182,9 @@
 
   ShardReadersPool createShardReadersPool() throws TransientKinesisException {
     return new ShardReadersPool(
-        kinesis, initialCheckpointGenerator.generate(kinesis), watermarkPolicyFactory);
+        kinesis,
+        initialCheckpointGenerator.generate(kinesis),
+        watermarkPolicyFactory,
+        maxCapacityPerShard);
   }
 }
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 7785cb7..a9d05f3 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -40,6 +40,7 @@
   private final WatermarkPolicyFactory watermarkPolicyFactory;
   private CheckpointGenerator initialCheckpointGenerator;
   private final Integer limit;
+  private final Integer maxCapacityPerShard;
 
   KinesisSource(
       AWSClientsProvider awsClientsProvider,
@@ -47,14 +48,16 @@
       StartingPoint startingPoint,
       Duration upToDateThreshold,
       WatermarkPolicyFactory watermarkPolicyFactory,
-      Integer limit) {
+      Integer limit,
+      Integer maxCapacityPerShard) {
     this(
         awsClientsProvider,
         new DynamicCheckpointGenerator(streamName, startingPoint),
         streamName,
         upToDateThreshold,
         watermarkPolicyFactory,
-        limit);
+        limit,
+        maxCapacityPerShard);
   }
 
   private KinesisSource(
@@ -63,13 +66,15 @@
       String streamName,
       Duration upToDateThreshold,
       WatermarkPolicyFactory watermarkPolicyFactory,
-      Integer limit) {
+      Integer limit,
+      Integer maxCapacityPerShard) {
     this.awsClientsProvider = awsClientsProvider;
     this.initialCheckpointGenerator = initialCheckpoint;
     this.streamName = streamName;
     this.upToDateThreshold = upToDateThreshold;
     this.watermarkPolicyFactory = watermarkPolicyFactory;
     this.limit = limit;
+    this.maxCapacityPerShard = maxCapacityPerShard;
     validate();
   }
 
@@ -93,7 +98,8 @@
               streamName,
               upToDateThreshold,
               watermarkPolicyFactory,
-              limit));
+              limit,
+              maxCapacityPerShard));
     }
     return sources;
   }
@@ -120,7 +126,8 @@
         checkpointGenerator,
         this,
         watermarkPolicyFactory,
-        upToDateThreshold);
+        upToDateThreshold,
+        maxCapacityPerShard);
   }
 
   @Override
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
index 71a12fc..195101c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
@@ -33,6 +33,7 @@
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.joda.time.Instant;
@@ -46,7 +47,7 @@
 class ShardReadersPool {
 
   private static final Logger LOG = LoggerFactory.getLogger(ShardReadersPool.class);
-  private static final int DEFAULT_CAPACITY_PER_SHARD = 10_000;
+  public static final int DEFAULT_CAPACITY_PER_SHARD = 10_000;
   private static final int ATTEMPTS_TO_SHUTDOWN = 3;
 
   /**
@@ -81,13 +82,6 @@
   ShardReadersPool(
       SimplifiedKinesisClient kinesis,
       KinesisReaderCheckpoint initialCheckpoint,
-      WatermarkPolicyFactory watermarkPolicyFactory) {
-    this(kinesis, initialCheckpoint, watermarkPolicyFactory, DEFAULT_CAPACITY_PER_SHARD);
-  }
-
-  ShardReadersPool(
-      SimplifiedKinesisClient kinesis,
-      KinesisReaderCheckpoint initialCheckpoint,
       WatermarkPolicyFactory watermarkPolicyFactory,
       int queueCapacityPerShard) {
     this.kinesis = kinesis;
@@ -309,4 +303,9 @@
     }
     return shardsMap.build();
   }
+
+  @VisibleForTesting
+  BlockingQueue<KinesisRecord> getRecordsQueue() {
+    return recordsQueue;
+  }
 }
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 37528ef..060af47 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -70,7 +70,8 @@
         kinesisSource,
         WatermarkPolicyFactory.withArrivalTimePolicy(),
         Duration.ZERO,
-        backlogBytesCheckThreshold) {
+        backlogBytesCheckThreshold,
+        ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD) {
       @Override
       ShardReadersPool createShardReadersPool() {
         return shardReadersPool;
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
index 125ff8c..0d9e9a3 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
@@ -78,7 +78,7 @@
     WatermarkPolicy policy = WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
 
     checkpoint = new KinesisReaderCheckpoint(ImmutableList.of(firstCheckpoint, secondCheckpoint));
-    shardReadersPool = Mockito.spy(new ShardReadersPool(kinesis, checkpoint, factory));
+    shardReadersPool = Mockito.spy(new ShardReadersPool(kinesis, checkpoint, factory, 100));
 
     when(factory.createWatermarkPolicy()).thenReturn(policy);
 
@@ -112,6 +112,7 @@
       }
     }
     assertThat(fetchedRecords).containsExactlyInAnyOrder(a, b, c, d);
+    assertThat(shardReadersPool.getRecordsQueue().remainingCapacity()).isEqualTo(100 * 2);
   }
 
   @Test
@@ -237,7 +238,12 @@
     KinesisReaderCheckpoint checkpoint = new KinesisReaderCheckpoint(Collections.emptyList());
     WatermarkPolicyFactory watermarkPolicyFactory = WatermarkPolicyFactory.withArrivalTimePolicy();
     shardReadersPool =
-        Mockito.spy(new ShardReadersPool(kinesis, checkpoint, watermarkPolicyFactory));
+        Mockito.spy(
+            new ShardReadersPool(
+                kinesis,
+                checkpoint,
+                watermarkPolicyFactory,
+                ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD));
     doReturn(firstIterator)
         .when(shardReadersPool)
         .createShardIterator(eq(kinesis), any(ShardCheckpoint.class));
diff --git a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
index 3d28981..486bbe6 100644
--- a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
+++ b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
@@ -32,6 +32,7 @@
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.TimeoutException;
@@ -415,16 +416,28 @@
   private static class RabbitMQCheckpointMark
       implements UnboundedSource.CheckpointMark, Serializable {
     transient Channel channel;
-    Instant oldestTimestamp = Instant.now();
+    Instant latestTimestamp = Instant.now();
     final List<Long> sessionIds = new ArrayList<>();
 
+    /**
+     * Advances the watermark to the provided time, provided said time is after the current
+     * watermark. If the provided time is before the latest, this function no-ops.
+     *
+     * @param time The time to advance the watermark to
+     */
+    public void advanceWatermark(Instant time) {
+      if (time.isAfter(latestTimestamp)) {
+        latestTimestamp = time;
+      }
+    }
+
     @Override
     public void finalizeCheckpoint() throws IOException {
       for (Long sessionId : sessionIds) {
         channel.basicAck(sessionId, false);
       }
       channel.txCommit();
-      oldestTimestamp = Instant.now();
+      latestTimestamp = Instant.now();
       sessionIds.clear();
     }
   }
@@ -449,7 +462,7 @@
 
     @Override
     public Instant getWatermark() {
-      return checkpointMark.oldestTimestamp;
+      return checkpointMark.latestTimestamp;
     }
 
     @Override
@@ -530,6 +543,10 @@
         // we consume message without autoAck (we want to do the ack ourselves)
         GetResponse delivery = channel.basicGet(queueName, false);
         if (delivery == null) {
+          current = null;
+          currentRecordId = null;
+          currentTimestamp = null;
+          checkpointMark.advanceWatermark(Instant.now());
           return false;
         }
         if (source.spec.useCorrelationId()) {
@@ -545,10 +562,10 @@
         checkpointMark.sessionIds.add(deliveryTag);
 
         current = new RabbitMqMessage(source.spec.routingKey(), delivery);
-        currentTimestamp = new Instant(delivery.getProps().getTimestamp());
-        if (currentTimestamp.isBefore(checkpointMark.oldestTimestamp)) {
-          checkpointMark.oldestTimestamp = currentTimestamp;
-        }
+        Date deliveryTimestamp = delivery.getProps().getTimestamp();
+        currentTimestamp =
+            (deliveryTimestamp != null) ? new Instant(deliveryTimestamp) : Instant.now();
+        checkpointMark.advanceWatermark(currentTimestamp);
       } catch (IOException e) {
         throw e;
       } catch (Exception e) {
diff --git a/sdks/python/apache_beam/coders/__init__.py b/sdks/python/apache_beam/coders/__init__.py
index 3192494..680f1c7 100644
--- a/sdks/python/apache_beam/coders/__init__.py
+++ b/sdks/python/apache_beam/coders/__init__.py
@@ -17,4 +17,5 @@
 from __future__ import absolute_import
 
 from apache_beam.coders.coders import *
+from apache_beam.coders.row_coder import *
 from apache_beam.coders.typecoders import registry
diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py
new file mode 100644
index 0000000..a259f36
--- /dev/null
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import itertools
+from array import array
+
+from apache_beam.coders.coder_impl import StreamCoderImpl
+from apache_beam.coders.coders import BytesCoder
+from apache_beam.coders.coders import Coder
+from apache_beam.coders.coders import FastCoder
+from apache_beam.coders.coders import FloatCoder
+from apache_beam.coders.coders import IterableCoder
+from apache_beam.coders.coders import StrUtf8Coder
+from apache_beam.coders.coders import TupleCoder
+from apache_beam.coders.coders import VarIntCoder
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import named_tuple_from_schema
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = ["RowCoder"]
+
+
+class RowCoder(FastCoder):
+  """ Coder for `typing.NamedTuple` instances.
+
+  Implements the beam:coder:row:v1 standard coder spec.
+  """
+
+  def __init__(self, schema):
+    """Initializes a :class:`RowCoder`.
+
+    Args:
+      schema (apache_beam.portability.api.schema_pb2.Schema): The protobuf
+        representation of the schema of the data that the RowCoder will be used
+        to encode/decode.
+    """
+    self.schema = schema
+    self.components = [
+        RowCoder.coder_from_type(field.type) for field in self.schema.fields
+    ]
+
+  def _create_impl(self):
+    return RowCoderImpl(self.schema, self.components)
+
+  def is_deterministic(self):
+    return all(c.is_deterministic() for c in self.components)
+
+  def to_type_hint(self):
+    return named_tuple_from_schema(self.schema)
+
+  def as_cloud_object(self, coders_context=None):
+    raise NotImplementedError("as_cloud_object not supported for RowCoder")
+
+  __hash__ = None
+
+  def __eq__(self, other):
+    return type(self) == type(other) and self.schema == other.schema
+
+  def to_runner_api_parameter(self, unused_context):
+    return (common_urns.coders.ROW.urn, self.schema, [])
+
+  @Coder.register_urn(common_urns.coders.ROW.urn, schema_pb2.Schema)
+  def from_runner_api_parameter(payload, components, unused_context):
+    return RowCoder(payload)
+
+  @staticmethod
+  def from_type_hint(named_tuple_type, registry):
+    return RowCoder(named_tuple_to_schema(named_tuple_type))
+
+  @staticmethod
+  def coder_from_type(field_type):
+    type_info = field_type.WhichOneof("type_info")
+    if type_info == "atomic_type":
+      if field_type.atomic_type in (schema_pb2.INT32,
+                                    schema_pb2.INT64):
+        return VarIntCoder()
+      elif field_type.atomic_type == schema_pb2.DOUBLE:
+        return FloatCoder()
+      elif field_type.atomic_type == schema_pb2.STRING:
+        return StrUtf8Coder()
+    elif type_info == "array_type":
+      return IterableCoder(
+          RowCoder.coder_from_type(field_type.array_type.element_type))
+
+    # The Java SDK supports several more types, but the coders are not yet
+    # standard, and are not implemented in Python.
+    raise ValueError(
+        "Encountered a type that is not currently supported by RowCoder: %s" %
+        field_type)
+
+
+class RowCoderImpl(StreamCoderImpl):
+  """For internal use only; no backwards-compatibility guarantees."""
+  SIZE_CODER = VarIntCoder().get_impl()
+  NULL_MARKER_CODER = BytesCoder().get_impl()
+
+  def __init__(self, schema, components):
+    self.schema = schema
+    self.constructor = named_tuple_from_schema(schema)
+    self.components = list(c.get_impl() for c in components)
+    self.has_nullable_fields = any(
+        field.type.nullable for field in self.schema.fields)
+
+  def encode_to_stream(self, value, out, nested):
+    nvals = len(self.schema.fields)
+    self.SIZE_CODER.encode_to_stream(nvals, out, True)
+    attrs = [getattr(value, f.name) for f in self.schema.fields]
+
+    words = array('B')
+    if self.has_nullable_fields:
+      nulls = list(attr is None for attr in attrs)
+      if any(nulls):
+        words = array('B', itertools.repeat(0, (nvals+7)//8))
+        for i, is_null in enumerate(nulls):
+          words[i//8] |= is_null << (i % 8)
+
+    self.NULL_MARKER_CODER.encode_to_stream(words.tostring(), out, True)
+
+    for c, field, attr in zip(self.components, self.schema.fields, attrs):
+      if attr is None:
+        if not field.type.nullable:
+          raise ValueError(
+              "Attempted to encode null for non-nullable field \"{}\".".format(
+                  field.name))
+        continue
+      c.encode_to_stream(attr, out, True)
+
+  def decode_from_stream(self, in_stream, nested):
+    nvals = self.SIZE_CODER.decode_from_stream(in_stream, True)
+    words = array('B')
+    words.fromstring(self.NULL_MARKER_CODER.decode_from_stream(in_stream, True))
+
+    if words:
+      nulls = ((words[i // 8] >> (i % 8)) & 0x01 for i in range(nvals))
+    else:
+      nulls = itertools.repeat(False, nvals)
+
+    # If this coder's schema has more attributes than the encoded value, then
+    # the schema must have changed. Populate the unencoded fields with nulls.
+    if len(self.components) > nvals:
+      nulls = itertools.chain(
+          nulls,
+          itertools.repeat(True, len(self.components) - nvals))
+
+    # Note that if this coder's schema has *fewer* attributes than the encoded
+    # value, we just need to ignore the additional values, which will occur
+    # here because we only decode as many values as we have coders for.
+    return self.constructor(*(
+        None if is_null else c.decode_from_stream(in_stream, True)
+        for c, is_null in zip(self.components, nulls)))
+
+  def _make_value_coder(self, nulls=itertools.repeat(False)):
+    components = [
+        component for component, is_null in zip(self.components, nulls)
+        if not is_null
+    ] if self.has_nullable_fields else self.components
+    return TupleCoder(components).get_impl()
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py b/sdks/python/apache_beam/coders/row_coder_test.py
new file mode 100644
index 0000000..dbdc5fc
--- /dev/null
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+
+import logging
+import typing
+import unittest
+from itertools import chain
+
+import numpy as np
+from past.builtins import unicode
+
+from apache_beam.coders import RowCoder
+from apache_beam.coders.typecoders import registry as coders_registry
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import typing_to_runner_api
+
+Person = typing.NamedTuple("Person", [
+    ("name", unicode),
+    ("age", np.int32),
+    ("address", typing.Optional[unicode]),
+    ("aliases", typing.List[unicode]),
+])
+
+coders_registry.register_coder(Person, RowCoder)
+
+
+class RowCoderTest(unittest.TestCase):
+  TEST_CASES = [
+      Person("Jon Snow", 23, None, ["crow", "wildling"]),
+      Person("Daenerys Targaryen", 25, "Westeros", ["Mother of Dragons"]),
+      Person("Michael Bluth", 30, None, [])
+  ]
+
+  def test_create_row_coder_from_named_tuple(self):
+    expected_coder = RowCoder(typing_to_runner_api(Person).row_type.schema)
+    real_coder = coders_registry.get_coder(Person)
+
+    for test_case in self.TEST_CASES:
+      self.assertEqual(
+          expected_coder.encode(test_case), real_coder.encode(test_case))
+
+      self.assertEqual(test_case,
+                       real_coder.decode(real_coder.encode(test_case)))
+
+  def test_create_row_coder_from_schema(self):
+    schema = schema_pb2.Schema(
+        id="person",
+        fields=[
+            schema_pb2.Field(
+                name="name",
+                type=schema_pb2.FieldType(
+                    atomic_type=schema_pb2.STRING)),
+            schema_pb2.Field(
+                name="age",
+                type=schema_pb2.FieldType(
+                    atomic_type=schema_pb2.INT32)),
+            schema_pb2.Field(
+                name="address",
+                type=schema_pb2.FieldType(
+                    atomic_type=schema_pb2.STRING, nullable=True)),
+            schema_pb2.Field(
+                name="aliases",
+                type=schema_pb2.FieldType(
+                    array_type=schema_pb2.ArrayType(
+                        element_type=schema_pb2.FieldType(
+                            atomic_type=schema_pb2.STRING)))),
+        ])
+    coder = RowCoder(schema)
+
+    for test_case in self.TEST_CASES:
+      self.assertEqual(test_case, coder.decode(coder.encode(test_case)))
+
+  @unittest.skip(
+      "BEAM-8030 - Overflow behavior in VarIntCoder is currently inconsistent"
+  )
+  def test_overflows(self):
+    IntTester = typing.NamedTuple('IntTester', [
+        # TODO(BEAM-7996): Test int8 and int16 here as well when those types are
+        # supported
+        # ('i8', typing.Optional[np.int8]),
+        # ('i16', typing.Optional[np.int16]),
+        ('i32', typing.Optional[np.int32]),
+        ('i64', typing.Optional[np.int64]),
+    ])
+
+    c = RowCoder.from_type_hint(IntTester, None)
+
+    no_overflow = chain(
+        (IntTester(i32=i, i64=None) for i in (-2**31, 2**31-1)),
+        (IntTester(i32=None, i64=i) for i in (-2**63, 2**63-1)),
+    )
+
+    # Encode max/min ints to make sure they don't throw any error
+    for case in no_overflow:
+      c.encode(case)
+
+    overflow = chain(
+        (IntTester(i32=i, i64=None) for i in (-2**31-1, 2**31)),
+        (IntTester(i32=None, i64=i) for i in (-2**63-1, 2**63)),
+    )
+
+    # Encode max+1/min-1 ints to make sure they DO throw an error
+    for case in overflow:
+      self.assertRaises(OverflowError, lambda: c.encode(case))
+
+  def test_none_in_non_nullable_field_throws(self):
+    Test = typing.NamedTuple('Test', [('foo', unicode)])
+
+    c = RowCoder.from_type_hint(Test, None)
+    self.assertRaises(ValueError, lambda: c.encode(Test(foo=None)))
+
+  def test_schema_remove_column(self):
+    fields = [("field1", unicode), ("field2", unicode)]
+    # new schema is missing one field that was in the old schema
+    Old = typing.NamedTuple('Old', fields)
+    New = typing.NamedTuple('New', fields[:-1])
+
+    old_coder = RowCoder.from_type_hint(Old, None)
+    new_coder = RowCoder.from_type_hint(New, None)
+
+    self.assertEqual(
+        New("foo"), new_coder.decode(old_coder.encode(Old("foo", "bar"))))
+
+  def test_schema_add_column(self):
+    fields = [("field1", unicode), ("field2", typing.Optional[unicode])]
+    # new schema has one (optional) field that didn't exist in the old schema
+    Old = typing.NamedTuple('Old', fields[:-1])
+    New = typing.NamedTuple('New', fields)
+
+    old_coder = RowCoder.from_type_hint(Old, None)
+    new_coder = RowCoder.from_type_hint(New, None)
+
+    self.assertEqual(
+        New("bar", None), new_coder.decode(old_coder.encode(Old("bar"))))
+
+  def test_schema_add_column_with_null_value(self):
+    fields = [("field1", typing.Optional[unicode]), ("field2", unicode),
+              ("field3", typing.Optional[unicode])]
+    # new schema has one (optional) field that didn't exist in the old schema
+    Old = typing.NamedTuple('Old', fields[:-1])
+    New = typing.NamedTuple('New', fields)
+
+    old_coder = RowCoder.from_type_hint(Old, None)
+    new_coder = RowCoder.from_type_hint(New, None)
+
+    self.assertEqual(
+        New(None, "baz", None),
+        new_coder.decode(old_coder.encode(Old(None, "baz"))))
+
+
+if __name__ == "__main__":
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 606ca81..5ffbeea 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -32,9 +32,11 @@
 
 from apache_beam.coders import coder_impl
 from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.portability.api import schema_pb2
 from apache_beam.runners import pipeline_context
 from apache_beam.transforms import window
 from apache_beam.transforms.window import IntervalWindow
+from apache_beam.typehints import schemas
 from apache_beam.utils import windowed_value
 from apache_beam.utils.timestamp import Timestamp
 
@@ -65,6 +67,42 @@
   return x
 
 
+def value_parser_from_schema(schema):
+  def attribute_parser_from_type(type_):
+    # TODO: This should be exhaustive
+    type_info = type_.WhichOneof("type_info")
+    if type_info == "atomic_type":
+      return schemas.ATOMIC_TYPE_TO_PRIMITIVE[type_.atomic_type]
+    elif type_info == "array_type":
+      element_parser = attribute_parser_from_type(type_.array_type.element_type)
+      return lambda x: list(map(element_parser, x))
+    elif type_info == "map_type":
+      key_parser = attribute_parser_from_type(type_.array_type.key_type)
+      value_parser = attribute_parser_from_type(type_.array_type.value_type)
+      return lambda x: dict((key_parser(k), value_parser(v))
+                            for k, v in x.items())
+
+  parsers = [(field.name, attribute_parser_from_type(field.type))
+             for field in schema.fields]
+
+  constructor = schemas.named_tuple_from_schema(schema)
+
+  def value_parser(x):
+    result = []
+    for name, parser in parsers:
+      value = x.pop(name)
+      result.append(None if value is None else parser(value))
+
+    if len(x):
+      raise ValueError(
+          "Test data contains attributes that don't exist in the schema: {}"
+          .format(', '.join(x.keys())))
+
+    return constructor(*result)
+
+  return value_parser
+
+
 class StandardCodersTest(unittest.TestCase):
 
   _urn_to_json_value_parser = {
@@ -134,11 +172,17 @@
                      for c in spec.get('components', ())]
     context.coders.put_proto(coder_id, beam_runner_api_pb2.Coder(
         spec=beam_runner_api_pb2.FunctionSpec(
-            urn=spec['urn'], payload=spec.get('payload')),
+            urn=spec['urn'], payload=spec.get('payload', '').encode('latin1')),
         component_coder_ids=component_ids))
     return context.coders.get_by_id(coder_id)
 
   def json_value_parser(self, coder_spec):
+    # TODO: integrate this with the logic for the other parsers
+    if coder_spec['urn'] == 'beam:coder:row:v1':
+      schema = schema_pb2.Schema.FromString(
+          coder_spec['payload'].encode('latin1'))
+      return value_parser_from_schema(schema)
+
     component_parsers = [
         self.json_value_parser(c) for c in coder_spec.get('components', ())]
     return lambda x: self._urn_to_json_value_parser[coder_spec['urn']](
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 039eaf0..4a5fef4 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -385,12 +385,9 @@
       pipeline.replace_all(DataflowRunner._SDF_PTRANSFORM_OVERRIDES)
 
     use_fnapi = apiclient._use_fnapi(options)
-    from apache_beam.portability.api import beam_runner_api_pb2
-    default_environment = beam_runner_api_pb2.Environment(
-        urn=common_urns.environments.DOCKER.urn,
-        payload=beam_runner_api_pb2.DockerPayload(
-            container_image=apiclient.get_container_image_from_options(options)
-            ).SerializeToString())
+    from apache_beam.transforms import environments
+    default_environment = environments.DockerEnvironment(
+        container_image=apiclient.get_container_image_from_options(options))
 
     # Snapshot the pipeline in a portable proto.
     self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
diff --git a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
index 3709deb..1f0e925 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
@@ -73,8 +73,8 @@
               document.querySelector("#{display_id}").protoInput = "{protostr}";
             </script>"""
 _DATAFRAME_PAGINATION_TEMPLATE = """
-            <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.2/jquery.min.js"></script> 
-            <script src="https://cdn.datatables.net/1.10.16/js/jquery.dataTables.js"></script> 
+            <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.2/jquery.min.js"></script>
+            <script src="https://cdn.datatables.net/1.10.16/js/jquery.dataTables.js"></script>
             <link rel="stylesheet" href="https://cdn.datatables.net/1.10.16/css/jquery.dataTables.css">
             {dataframe_html}
             <script>
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index 913dac6..e4328df 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -31,25 +31,10 @@
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.transforms import core
+from apache_beam.transforms import environments
 from apache_beam.typehints import native_type_compatibility
 
 
-class Environment(object):
-  """A wrapper around the environment proto.
-
-  Provides consistency with how the other componentes are accessed.
-  """
-  def __init__(self, proto):
-    self.proto = proto
-
-  def to_runner_api(self, context):
-    return self.proto
-
-  @staticmethod
-  def from_runner_api(proto, context):
-    return Environment(proto)
-
-
 class _PipelineContextMap(object):
   """This is a bi-directional map between objects and ids.
 
@@ -128,7 +113,7 @@
       'pcollections': pvalue.PCollection,
       'coders': coders.Coder,
       'windowing_strategies': core.Windowing,
-      'environments': Environment,
+      'environments': environments.Environment,
   }
 
   def __init__(
@@ -146,7 +131,7 @@
               self, cls, namespace, getattr(proto, name, None)))
     if default_environment:
       self._default_environment_id = self.environments.get_id(
-          Environment(default_environment), label='default_environment')
+          default_environment, label='default_environment')
     else:
       self._default_environment_id = None
     self.use_fake_coders = use_fake_coders
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index dd0c1e2..9f97e2f 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -72,6 +72,7 @@
 from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
 from apache_beam.runners.worker.sdk_worker import _Future
 from apache_beam.runners.worker.statecache import StateCache
+from apache_beam.transforms import environments
 from apache_beam.transforms import trigger
 from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import GlobalWindows
@@ -344,7 +345,7 @@
     self._last_uid = -1
     self._default_environment = (
         default_environment
-        or beam_runner_api_pb2.Environment(urn=python_urns.EMBEDDED_PYTHON))
+        or environments.EmbeddedPythonEnvironment())
     self._bundle_repeat = bundle_repeat
     self._num_workers = 1
     self._progress_frequency = progress_request_frequency
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 1f368c0..2204a24 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -47,8 +47,6 @@
 from apache_beam.metrics.metricbase import MetricName
 from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.portability import python_urns
-from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.portability import fn_api_runner
 from apache_beam.runners.worker import data_plane
 from apache_beam.runners.worker import sdk_worker
@@ -58,6 +56,7 @@
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
 from apache_beam.tools import utils
+from apache_beam.transforms import environments
 from apache_beam.transforms import userstate
 from apache_beam.transforms import window
 
@@ -1085,8 +1084,7 @@
   def create_pipeline(self):
     return beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(
-            default_environment=beam_runner_api_pb2.Environment(
-                urn=python_urns.EMBEDDED_PYTHON_GRPC)))
+            default_environment=environments.EmbeddedPythonGrpcEnvironment()))
 
 
 class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
@@ -1094,9 +1092,9 @@
   def create_pipeline(self):
     return beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(
-            default_environment=beam_runner_api_pb2.Environment(
-                urn=python_urns.EMBEDDED_PYTHON_GRPC,
-                payload=b'2,%d' % fn_api_runner.STATE_CACHE_SIZE)))
+            default_environment=environments.EmbeddedPythonGrpcEnvironment(
+                num_workers=2,
+                state_cache_size=fn_api_runner.STATE_CACHE_SIZE)))
 
 
 class FnApiRunnerTestWithDisabledCaching(FnApiRunnerTest):
@@ -1104,10 +1102,8 @@
   def create_pipeline(self):
     return beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(
-            default_environment=beam_runner_api_pb2.Environment(
-                urn=python_urns.EMBEDDED_PYTHON_GRPC,
-                # number of workers, state cache size
-                payload=b'2,0')))
+            default_environment=environments.EmbeddedPythonGrpcEnvironment(
+                num_workers=2, state_cache_size=0)))
 
 
 class FnApiRunnerTestWithMultiWorkers(FnApiRunnerTest):
@@ -1134,8 +1130,7 @@
     pipeline_options = PipelineOptions(direct_num_workers=2)
     p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(
-            default_environment=beam_runner_api_pb2.Environment(
-                urn=python_urns.EMBEDDED_PYTHON_GRPC)),
+            default_environment=environments.EmbeddedPythonGrpcEnvironment()),
         options=pipeline_options)
     #TODO(BEAM-8444): Fix these tests..
     p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
@@ -1185,8 +1180,7 @@
     # to the bundle process request.
     return beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(
-            default_environment=beam_runner_api_pb2.Environment(
-                urn=python_urns.EMBEDDED_PYTHON_GRPC)))
+            default_environment=environments.EmbeddedPythonGrpcEnvironment()))
 
   def test_checkpoint(self):
     # This split manager will get re-invoked on each smaller split,
@@ -1490,8 +1484,7 @@
     pipeline_options = PipelineOptions(direct_num_workers=2)
     p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(
-            default_environment=beam_runner_api_pb2.Environment(
-                urn=python_urns.EMBEDDED_PYTHON_GRPC)),
+            default_environment=environments.EmbeddedPythonGrpcEnvironment()),
         options=pipeline_options)
     #TODO(BEAM-8444): Fix these tests..
     p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
@@ -1508,8 +1501,7 @@
   def create_pipeline(self):
     return beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(
-            default_environment=beam_runner_api_pb2.Environment(
-                urn=python_urns.EMBEDDED_PYTHON_GRPC),
+            default_environment=environments.EmbeddedPythonGrpcEnvironment(),
             progress_request_frequency=0.5))
 
   def test_lull_logging(self):
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index c7fb76c..2cffd47 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -19,7 +19,6 @@
 
 import functools
 import itertools
-import json
 import logging
 import sys
 import threading
@@ -36,8 +35,6 @@
 from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_job_api_pb2
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.portability.api import endpoints_pb2
 from apache_beam.runners import runner
 from apache_beam.runners.job import utils as job_utils
 from apache_beam.runners.portability import fn_api_runner_transforms
@@ -46,6 +43,7 @@
 from apache_beam.runners.portability import portable_stager
 from apache_beam.runners.worker import sdk_worker_main
 from apache_beam.runners.worker import worker_pool_main
+from apache_beam.transforms import environments
 
 __all__ = ['PortableRunner']
 
@@ -65,6 +63,8 @@
     beam_job_api_pb2.JobState.CANCELLED,
 ]
 
+ENV_TYPE_ALIASES = {'LOOPBACK': 'EXTERNAL'}
+
 
 class PortableRunner(runner.PipelineRunner):
   """
@@ -102,65 +102,24 @@
     # does not exist in the Java SDK. In portability, the entry point is clearly
     # defined via the JobService.
     portable_options.view_as(StandardOptions).runner = None
-    environment_urn = common_urns.environments.DOCKER.urn
-    if portable_options.environment_type == 'DOCKER':
+    environment_type = portable_options.environment_type
+    if not environment_type:
       environment_urn = common_urns.environments.DOCKER.urn
-    elif portable_options.environment_type == 'PROCESS':
-      environment_urn = common_urns.environments.PROCESS.urn
-    elif portable_options.environment_type in ('EXTERNAL', 'LOOPBACK'):
-      environment_urn = common_urns.environments.EXTERNAL.urn
-    elif portable_options.environment_type:
-      if portable_options.environment_type.startswith('beam:env:'):
-        environment_urn = portable_options.environment_type
-      else:
-        raise ValueError(
-            'Unknown environment type: %s' % portable_options.environment_type)
-
-    if environment_urn == common_urns.environments.DOCKER.urn:
-      docker_image = (
-          portable_options.environment_config
-          or PortableRunner.default_docker_image())
-      return beam_runner_api_pb2.Environment(
-          urn=common_urns.environments.DOCKER.urn,
-          payload=beam_runner_api_pb2.DockerPayload(
-              container_image=docker_image
-          ).SerializeToString())
-    elif environment_urn == common_urns.environments.PROCESS.urn:
-      config = json.loads(portable_options.environment_config)
-      return beam_runner_api_pb2.Environment(
-          urn=common_urns.environments.PROCESS.urn,
-          payload=beam_runner_api_pb2.ProcessPayload(
-              os=(config.get('os') or ''),
-              arch=(config.get('arch') or ''),
-              command=config.get('command'),
-              env=(config.get('env') or '')
-          ).SerializeToString())
-    elif environment_urn == common_urns.environments.EXTERNAL.urn:
-      def looks_like_json(environment_config):
-        import re
-        return re.match(r'\s*\{.*\}\s*$', environment_config)
-
-      if looks_like_json(portable_options.environment_config):
-        config = json.loads(portable_options.environment_config)
-        url = config.get('url')
-        if not url:
-          raise ValueError('External environment endpoint must be set.')
-        params = config.get('params')
-      else:
-        url = portable_options.environment_config
-        params = None
-
-      return beam_runner_api_pb2.Environment(
-          urn=common_urns.environments.EXTERNAL.urn,
-          payload=beam_runner_api_pb2.ExternalPayload(
-              endpoint=endpoints_pb2.ApiServiceDescriptor(url=url),
-              params=params
-          ).SerializeToString())
+    elif environment_type.startswith('beam:env:'):
+      environment_urn = environment_type
     else:
-      return beam_runner_api_pb2.Environment(
-          urn=environment_urn,
-          payload=(portable_options.environment_config.encode('ascii')
-                   if portable_options.environment_config else None))
+      # e.g. handle LOOPBACK -> EXTERNAL
+      environment_type = ENV_TYPE_ALIASES.get(environment_type,
+                                              environment_type)
+      try:
+        environment_urn = getattr(common_urns.environments,
+                                  environment_type).urn
+      except AttributeError:
+        raise ValueError(
+            'Unknown environment type: %s' % environment_type)
+
+    env_class = environments.Environment.get_env_cls_from_urn(environment_urn)
+    return env_class.from_options(portable_options)
 
   def default_job_server(self, portable_options):
     # TODO Provide a way to specify a container Docker URL
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
index 3658c21..24c6b87 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
@@ -36,18 +36,16 @@
 from apache_beam.options.pipeline_options import DirectOptions
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.options.pipeline_options import PortableOptions
-from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
 from apache_beam.portability.api import beam_job_api_pb2
 from apache_beam.portability.api import beam_job_api_pb2_grpc
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.portability.api import endpoints_pb2
 from apache_beam.runners.portability import fn_api_runner_test
 from apache_beam.runners.portability import portable_runner
 from apache_beam.runners.portability.local_job_service import LocalJobServicer
 from apache_beam.runners.portability.portable_runner import PortableRunner
 from apache_beam.runners.worker import worker_pool_main
 from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
+from apache_beam.transforms import environments
 
 
 class PortableRunnerTest(fn_api_runner_test.FnApiRunnerTest):
@@ -261,11 +259,7 @@
     docker_image = PortableRunner.default_docker_image()
     self.assertEqual(
         PortableRunner._create_environment(PipelineOptions.from_dictionary({})),
-        beam_runner_api_pb2.Environment(
-            urn=common_urns.environments.DOCKER.urn,
-            payload=beam_runner_api_pb2.DockerPayload(
-                container_image=docker_image
-            ).SerializeToString()))
+        environments.DockerEnvironment(container_image=docker_image))
 
   def test__create_docker_environment(self):
     docker_image = 'py-docker'
@@ -273,11 +267,7 @@
         PortableRunner._create_environment(PipelineOptions.from_dictionary({
             'environment_type': 'DOCKER',
             'environment_config': docker_image,
-        })), beam_runner_api_pb2.Environment(
-            urn=common_urns.environments.DOCKER.urn,
-            payload=beam_runner_api_pb2.DockerPayload(
-                container_image=docker_image
-            ).SerializeToString()))
+        })), environments.DockerEnvironment(container_image=docker_image))
 
   def test__create_process_environment(self):
     self.assertEqual(
@@ -286,48 +276,28 @@
             'environment_config': '{"os": "linux", "arch": "amd64", '
                                   '"command": "run.sh", '
                                   '"env":{"k1": "v1"} }',
-        })), beam_runner_api_pb2.Environment(
-            urn=common_urns.environments.PROCESS.urn,
-            payload=beam_runner_api_pb2.ProcessPayload(
-                os='linux',
-                arch='amd64',
-                command='run.sh',
-                env={'k1': 'v1'},
-            ).SerializeToString()))
+        })), environments.ProcessEnvironment('run.sh', os='linux', arch='amd64',
+                                             env={'k1': 'v1'}))
     self.assertEqual(
         PortableRunner._create_environment(PipelineOptions.from_dictionary({
             'environment_type': 'PROCESS',
             'environment_config': '{"command": "run.sh"}',
-        })), beam_runner_api_pb2.Environment(
-            urn=common_urns.environments.PROCESS.urn,
-            payload=beam_runner_api_pb2.ProcessPayload(
-                command='run.sh',
-            ).SerializeToString()))
+        })), environments.ProcessEnvironment('run.sh'))
 
   def test__create_external_environment(self):
     self.assertEqual(
         PortableRunner._create_environment(PipelineOptions.from_dictionary({
             'environment_type': "EXTERNAL",
             'environment_config': 'localhost:50000',
-        })), beam_runner_api_pb2.Environment(
-            urn=common_urns.environments.EXTERNAL.urn,
-            payload=beam_runner_api_pb2.ExternalPayload(
-                endpoint=endpoints_pb2.ApiServiceDescriptor(
-                    url='localhost:50000')
-            ).SerializeToString()))
-    raw_config = ' {"url":"localhost:50000", "params":{"test":"test"}} '
+        })), environments.ExternalEnvironment('localhost:50000'))
+    raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"}} '
     for env_config in (raw_config, raw_config.lstrip(), raw_config.strip()):
       self.assertEqual(
           PortableRunner._create_environment(PipelineOptions.from_dictionary({
               'environment_type': "EXTERNAL",
               'environment_config': env_config,
-          })), beam_runner_api_pb2.Environment(
-              urn=common_urns.environments.EXTERNAL.urn,
-              payload=beam_runner_api_pb2.ExternalPayload(
-                  endpoint=endpoints_pb2.ApiServiceDescriptor(
-                      url='localhost:50000'),
-                  params={"test": "test"}
-              ).SerializeToString()))
+          })), environments.ExternalEnvironment('localhost:50000',
+                                                params={"k1":"v1"}))
     with self.assertRaises(ValueError):
       PortableRunner._create_environment(PipelineOptions.from_dictionary({
           'environment_type': "EXTERNAL",
@@ -336,7 +306,7 @@
     with self.assertRaises(ValueError) as ctx:
       PortableRunner._create_environment(PipelineOptions.from_dictionary({
           'environment_type': "EXTERNAL",
-          'environment_config': '{"params":{"test":"test"}}',
+          'environment_config': '{"params":{"k1":"v1"}}',
       }))
     self.assertIn(
         'External environment endpoint must be set.', ctx.exception.args)
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
new file mode 100644
index 0000000..8758ab8
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -0,0 +1,396 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Environments concepts.
+
+For internal use only. No backwards compatibility guarantees."""
+
+from __future__ import absolute_import
+
+import json
+
+from google.protobuf import message
+
+from apache_beam.portability import common_urns
+from apache_beam.portability import python_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.utils import proto_utils
+
+__all__ = ['Environment',
+           'DockerEnvironment', 'ProcessEnvironment', 'ExternalEnvironment',
+           'EmbeddedPythonEnvironment', 'EmbeddedPythonGrpcEnvironment',
+           'SubprocessSDKEnvironment', 'RunnerAPIEnvironmentHolder']
+
+
+class Environment(object):
+  """Abstract base class for environments.
+
+  Represents a type and configuration of environment.
+  Each type of Environment should have a unique urn.
+
+  For internal use only. No backwards compatibility guarantees.
+  """
+
+  _known_urns = {}
+  _urn_to_env_cls = {}
+
+  def to_runner_api_parameter(self, context):
+    raise NotImplementedError
+
+  @classmethod
+  def register_urn(cls, urn, parameter_type, constructor=None):
+
+    def register(constructor):
+      if isinstance(constructor, type):
+        constructor.from_runner_api_parameter = register(
+            constructor.from_runner_api_parameter)
+        # register environment urn to environment class
+        cls._urn_to_env_cls[urn] = constructor
+        return constructor
+
+      else:
+        cls._known_urns[urn] = parameter_type, constructor
+        return staticmethod(constructor)
+
+    if constructor:
+      # Used as a statement.
+      register(constructor)
+    else:
+      # Used as a decorator.
+      return register
+
+  @classmethod
+  def get_env_cls_from_urn(cls, urn):
+    return cls._urn_to_env_cls[urn]
+
+  def to_runner_api(self, context):
+    urn, typed_param = self.to_runner_api_parameter(context)
+    return beam_runner_api_pb2.Environment(
+        urn=urn,
+        payload=typed_param.SerializeToString()
+        if isinstance(typed_param, message.Message)
+        else typed_param if (isinstance(typed_param, bytes) or
+                             typed_param is None)
+        else typed_param.encode('utf-8')
+    )
+
+  @classmethod
+  def from_runner_api(cls, proto, context):
+    if proto is None or not proto.urn:
+      return None
+    parameter_type, constructor = cls._known_urns[proto.urn]
+
+    try:
+      return constructor(
+          proto_utils.parse_Bytes(proto.payload, parameter_type),
+          context)
+    except Exception:
+      if context.allow_proto_holders:
+        return RunnerAPIEnvironmentHolder(proto)
+      raise
+
+  @classmethod
+  def from_options(cls, options):
+    """Creates an Environment object from PipelineOptions.
+
+    Args:
+      options: The PipelineOptions object.
+    """
+    raise NotImplementedError
+
+
+@Environment.register_urn(common_urns.environments.DOCKER.urn,
+                          beam_runner_api_pb2.DockerPayload)
+class DockerEnvironment(Environment):
+
+  def __init__(self, container_image=None):
+    from apache_beam.runners.portability.portable_runner import PortableRunner
+
+    if container_image:
+      self.container_image = container_image
+    else:
+      self.container_image = PortableRunner.default_docker_image()
+
+  def __eq__(self, other):
+    return self.__class__ == other.__class__ \
+           and self.container_image == other.container_image
+
+  def __ne__(self, other):
+    # TODO(BEAM-5949): Needed for Python 2 compatibility.
+    return not self == other
+
+  def __hash__(self):
+    return hash((self.__class__, self.container_image))
+
+  def __repr__(self):
+    return 'DockerEnvironment(container_image=%s)' % self.container_image
+
+  def to_runner_api_parameter(self, context):
+    return (common_urns.environments.DOCKER.urn,
+            beam_runner_api_pb2.DockerPayload(
+                container_image=self.container_image))
+
+  @staticmethod
+  def from_runner_api_parameter(payload, context):
+    return DockerEnvironment(container_image=payload.container_image)
+
+  @classmethod
+  def from_options(cls, options):
+    return cls(container_image=options.environment_config)
+
+
+@Environment.register_urn(common_urns.environments.PROCESS.urn,
+                          beam_runner_api_pb2.ProcessPayload)
+class ProcessEnvironment(Environment):
+
+  def __init__(self, command, os='', arch='', env=None):
+    self.command = command
+    self.os = os
+    self.arch = arch
+    self.env = env or {}
+
+  def __eq__(self, other):
+    return self.__class__ == other.__class__ \
+      and self.command == other.command and self.os == other.os \
+      and self.arch == other.arch and self.env == other.env
+
+  def __ne__(self, other):
+    # TODO(BEAM-5949): Needed for Python 2 compatibility.
+    return not self == other
+
+  def __hash__(self):
+    return hash((self.__class__, self.command, self.os, self.arch,
+                 frozenset(self.env.items())))
+
+  def __repr__(self):
+    repr_parts = ['command=%s' % self.command]
+    if self.os:
+      repr_parts.append('os=%s'% self.os)
+    if self.arch:
+      repr_parts.append('arch=%s' % self.arch)
+    repr_parts.append('env=%s' % self.env)
+    return 'ProcessEnvironment(%s)' % ','.join(repr_parts)
+
+  def to_runner_api_parameter(self, context):
+    return (common_urns.environments.PROCESS.urn,
+            beam_runner_api_pb2.ProcessPayload(
+                os=self.os,
+                arch=self.arch,
+                command=self.command,
+                env=self.env))
+
+  @staticmethod
+  def from_runner_api_parameter(payload, context):
+    return ProcessEnvironment(command=payload.command, os=payload.os,
+                              arch=payload.arch, env=payload.env)
+
+  @classmethod
+  def from_options(cls, options):
+    config = json.loads(options.environment_config)
+    return cls(config.get('command'), os=config.get('os', ''),
+               arch=config.get('arch', ''), env=config.get('env', ''))
+
+
+@Environment.register_urn(common_urns.environments.EXTERNAL.urn,
+                          beam_runner_api_pb2.ExternalPayload)
+class ExternalEnvironment(Environment):
+
+  def __init__(self, url, params=None):
+    self.url = url
+    self.params = params
+
+  def __eq__(self, other):
+    return self.__class__ == other.__class__ and self.url == other.url \
+      and self.params == other.params
+
+  def __ne__(self, other):
+    # TODO(BEAM-5949): Needed for Python 2 compatibility.
+    return not self == other
+
+  def __hash__(self):
+    params = self.params
+    if params is not None:
+      params = frozenset(self.params.items())
+    return hash((self.__class__, self.url, params))
+
+  def __repr__(self):
+    return 'ExternalEnvironment(url=%s,params=%s)' % (self.url, self.params)
+
+  def to_runner_api_parameter(self, context):
+    return (common_urns.environments.EXTERNAL.urn,
+            beam_runner_api_pb2.ExternalPayload(
+                endpoint=endpoints_pb2.ApiServiceDescriptor(url=self.url),
+                params=self.params
+            ))
+
+  @staticmethod
+  def from_runner_api_parameter(payload, context):
+    return ExternalEnvironment(payload.endpoint.url,
+                               params=payload.params or None)
+
+  @classmethod
+  def from_options(cls, options):
+    def looks_like_json(environment_config):
+      import re
+      return re.match(r'\s*\{.*\}\s*$', environment_config)
+
+    if looks_like_json(options.environment_config):
+      config = json.loads(options.environment_config)
+      url = config.get('url')
+      if not url:
+        raise ValueError('External environment endpoint must be set.')
+      params = config.get('params')
+    else:
+      url = options.environment_config
+      params = None
+
+    return cls(url, params=params)
+
+
+@Environment.register_urn(python_urns.EMBEDDED_PYTHON, None)
+class EmbeddedPythonEnvironment(Environment):
+
+  def __eq__(self, other):
+    return self.__class__ == other.__class__
+
+  def __ne__(self, other):
+    # TODO(BEAM-5949): Needed for Python 2 compatibility.
+    return not self == other
+
+  def __hash__(self):
+    return hash(self.__class__)
+
+  def to_runner_api_parameter(self, context):
+    return python_urns.EMBEDDED_PYTHON, None
+
+  @staticmethod
+  def from_runner_api_parameter(unused_payload, context):
+    return EmbeddedPythonEnvironment()
+
+  @classmethod
+  def from_options(cls, options):
+    return cls()
+
+
+@Environment.register_urn(python_urns.EMBEDDED_PYTHON_GRPC, bytes)
+class EmbeddedPythonGrpcEnvironment(Environment):
+
+  def __init__(self, num_workers=None, state_cache_size=None):
+    self.num_workers = num_workers
+    self.state_cache_size = state_cache_size
+
+  def __eq__(self, other):
+    return self.__class__ == other.__class__ \
+           and self.num_workers == other.num_workers \
+           and self.state_cache_size == other.state_cache_size
+
+  def __ne__(self, other):
+    # TODO(BEAM-5949): Needed for Python 2 compatibility.
+    return not self == other
+
+  def __hash__(self):
+    return hash((self.__class__, self.num_workers, self.state_cache_size))
+
+  def __repr__(self):
+    repr_parts = []
+    if not self.num_workers is None:
+      repr_parts.append('num_workers=%d' % self.num_workers)
+    if not self.state_cache_size is None:
+      repr_parts.append('state_cache_size=%d' % self.state_cache_size)
+    return 'EmbeddedPythonGrpcEnvironment(%s)' % ','.join(repr_parts)
+
+  def to_runner_api_parameter(self, context):
+    if self.num_workers is None and self.state_cache_size is None:
+      payload = b''
+    elif self.num_workers is not None and self.state_cache_size is not None:
+      payload = b'%d,%d' % (self.num_workers, self.state_cache_size)
+    else:
+      # We want to make sure that the environment stays the same through the
+      # roundtrip to runner api, so here we don't want to set default for the
+      # other if only one of num workers or state cache size is set
+      raise ValueError('Must provide worker num and state cache size.')
+    return python_urns.EMBEDDED_PYTHON_GRPC, payload
+
+  @staticmethod
+  def from_runner_api_parameter(payload, context):
+    if payload:
+      num_workers, state_cache_size = payload.decode('utf-8').split(',')
+      return EmbeddedPythonGrpcEnvironment(
+          num_workers=int(num_workers),
+          state_cache_size=int(state_cache_size))
+    else:
+      return EmbeddedPythonGrpcEnvironment()
+
+  @classmethod
+  def from_options(cls, options):
+    if options.environment_config:
+      num_workers, state_cache_size = options.environment_config.split(',')
+      return cls(num_workers=num_workers, state_cache_size=state_cache_size)
+    else:
+      return cls()
+
+
+@Environment.register_urn(python_urns.SUBPROCESS_SDK, bytes)
+class SubprocessSDKEnvironment(Environment):
+
+  def __init__(self, command_string):
+    self.command_string = command_string
+
+  def __eq__(self, other):
+    return self.__class__ == other.__class__ \
+           and self.command_string == other.command_string
+
+  def __ne__(self, other):
+    # TODO(BEAM-5949): Needed for Python 2 compatibility.
+    return not self == other
+
+  def __hash__(self):
+    return hash((self.__class__, self.command_string))
+
+  def __repr__(self):
+    return 'SubprocessSDKEnvironment(command_string=%s)' % self.container_string
+
+  def to_runner_api_parameter(self, context):
+    return python_urns.SUBPROCESS_SDK, self.command_string.encode('utf-8')
+
+  @staticmethod
+  def from_runner_api_parameter(payload, context):
+    return SubprocessSDKEnvironment(payload.decode('utf-8'))
+
+  @classmethod
+  def from_options(cls, options):
+    return cls(options.environment_config)
+
+
+class RunnerAPIEnvironmentHolder(Environment):
+
+  def __init__(self, proto):
+    self.proto = proto
+
+  def to_runner_api(self, context):
+    return self.proto
+
+  def __eq__(self, other):
+    return self.__class__ == other.__class__ and self.proto == other.proto
+
+  def __ne__(self, other):
+    # TODO(BEAM-5949): Needed for Python 2 compatibility.
+    return not self == other
+
+  def __hash__(self):
+    return hash((self.__class__, self.proto))
diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py
new file mode 100644
index 0000000..0fd568c
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/environments_test.py
@@ -0,0 +1,68 @@
+# -- coding: utf-8 --
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the transform.environments classes."""
+
+from __future__ import absolute_import
+
+import logging
+import unittest
+
+from apache_beam.runners import pipeline_context
+from apache_beam.transforms.environments import DockerEnvironment
+from apache_beam.transforms.environments import EmbeddedPythonEnvironment
+from apache_beam.transforms.environments import EmbeddedPythonGrpcEnvironment
+from apache_beam.transforms.environments import Environment
+from apache_beam.transforms.environments import ExternalEnvironment
+from apache_beam.transforms.environments import ProcessEnvironment
+from apache_beam.transforms.environments import SubprocessSDKEnvironment
+
+
+class RunnerApiTest(unittest.TestCase):
+
+  def test_environment_encoding(self):
+    for environment in (
+        DockerEnvironment(),
+        DockerEnvironment(container_image='img'),
+        ProcessEnvironment('run.sh'),
+        ProcessEnvironment('run.sh', os='linux', arch='amd64',
+                           env={'k1': 'v1'}),
+        ExternalEnvironment('localhost:8080'),
+        ExternalEnvironment('localhost:8080', params={'k1': 'v1'}),
+        EmbeddedPythonEnvironment(),
+        EmbeddedPythonGrpcEnvironment(),
+        EmbeddedPythonGrpcEnvironment(num_workers=2, state_cache_size=0),
+        SubprocessSDKEnvironment(command_string=u'foö')):
+      context = pipeline_context.PipelineContext()
+      self.assertEqual(
+          environment,
+          Environment.from_runner_api(
+              environment.to_runner_api(context), context)
+      )
+
+    with self.assertRaises(ValueError) as ctx:
+      EmbeddedPythonGrpcEnvironment(num_workers=2).to_runner_api(
+          pipeline_context.PipelineContext()
+      )
+    self.assertIn('Must provide worker num and state cache size.',
+                  ctx.exception.args)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index ad201d1..0f04d43 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -224,7 +224,7 @@
     metric_results = res.metrics().query(MetricsFilter()
                                          .with_name('recordsRead'))
     outputs_counter = metric_results['counters'][0]
-    self.assertEqual(outputs_counter.key.step, 'Read')
+    self.assertStartswith(outputs_counter.key.step, 'Read')
     self.assertEqual(outputs_counter.key.metric.name, 'recordsRead')
     self.assertEqual(outputs_counter.committed, 100)
     self.assertEqual(outputs_counter.attempted, 100)
diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py
index 43cdedc..d73a1cf 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py
@@ -50,6 +50,17 @@
   return None
 
 
+def _get_args(typ):
+  """Returns the index-th argument to the given type."""
+  try:
+    return typ.__args__
+  except AttributeError:
+    compatible_args = _get_compatible_args(typ)
+    if compatible_args is None:
+      raise
+    return compatible_args
+
+
 def _get_arg(typ, index):
   """Returns the index-th argument to the given type."""
   try:
@@ -105,6 +116,15 @@
   return lambda user_type: type(user_type) == type(match_against)
 
 
+def _match_is_exactly_mapping(user_type):
+  # Avoid unintentionally catching all subtypes (e.g. strings and mappings).
+  if sys.version_info < (3, 7):
+    expected_origin = typing.Mapping
+  else:
+    expected_origin = collections.abc.Mapping
+  return getattr(user_type, '__origin__', None) is expected_origin
+
+
 def _match_is_exactly_iterable(user_type):
   # Avoid unintentionally catching all subtypes (e.g. strings and mappings).
   if sys.version_info < (3, 7):
@@ -119,6 +139,22 @@
           hasattr(user_type, '_field_types'))
 
 
+def _match_is_optional(user_type):
+  return _match_is_union(user_type) and sum(
+      tp is type(None) for tp in _get_args(user_type)) == 1
+
+
+def extract_optional_type(user_type):
+  """Extracts the non-None type from Optional type user_type.
+
+  If user_type is not Optional, returns None
+  """
+  if not _match_is_optional(user_type):
+    return None
+  else:
+    return next(tp for tp in _get_args(user_type) if tp is not type(None))
+
+
 def _match_is_union(user_type):
   # For non-subscripted unions (Python 2.7.14+ with typing 3.64)
   if user_type is typing.Union:
diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py
new file mode 100644
index 0000000..812cbe1
--- /dev/null
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -0,0 +1,218 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Support for mapping python types to proto Schemas and back again.
+
+Python              Schema
+np.int8     <-----> BYTE
+np.int16    <-----> INT16
+np.int32    <-----> INT32
+np.int64    <-----> INT64
+int         ---/
+np.float32  <-----> FLOAT
+np.float64  <-----> DOUBLE
+float       ---/
+bool        <-----> BOOLEAN
+
+The mappings for STRING and BYTES are different between python 2 and python 3,
+because of the changes to str:
+py3:
+str/unicode <-----> STRING
+bytes       <-----> BYTES
+ByteString  ---/
+
+py2:
+str will be rejected since it is ambiguous.
+unicode     <-----> STRING
+ByteString  <-----> BYTES
+"""
+
+from __future__ import absolute_import
+
+import sys
+from typing import ByteString
+from typing import Mapping
+from typing import NamedTuple
+from typing import Optional
+from typing import Sequence
+from uuid import uuid4
+
+import numpy as np
+from past.builtins import unicode
+
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.native_type_compatibility import _get_args
+from apache_beam.typehints.native_type_compatibility import _match_is_exactly_mapping
+from apache_beam.typehints.native_type_compatibility import _match_is_named_tuple
+from apache_beam.typehints.native_type_compatibility import _match_is_optional
+from apache_beam.typehints.native_type_compatibility import _safe_issubclass
+from apache_beam.typehints.native_type_compatibility import extract_optional_type
+
+
+# Registry of typings for a schema by UUID
+class SchemaTypeRegistry(object):
+  def __init__(self):
+    self.by_id = {}
+    self.by_typing = {}
+
+  def add(self, typing, schema):
+    self.by_id[schema.id] = (typing, schema)
+
+  def get_typing_by_id(self, unique_id):
+    result = self.by_id.get(unique_id, None)
+    return result[0] if result is not None else None
+
+  def get_schema_by_id(self, unique_id):
+    result = self.by_id.get(unique_id, None)
+    return result[1] if result is not None else None
+
+
+SCHEMA_REGISTRY = SchemaTypeRegistry()
+
+
+# Bi-directional mappings
+_PRIMITIVES = (
+    (np.int8, schema_pb2.BYTE),
+    (np.int16, schema_pb2.INT16),
+    (np.int32, schema_pb2.INT32),
+    (np.int64, schema_pb2.INT64),
+    (np.float32, schema_pb2.FLOAT),
+    (np.float64, schema_pb2.DOUBLE),
+    (unicode, schema_pb2.STRING),
+    (bool, schema_pb2.BOOLEAN),
+    (bytes if sys.version_info.major >= 3 else ByteString,
+     schema_pb2.BYTES),
+)
+
+PRIMITIVE_TO_ATOMIC_TYPE = dict((typ, atomic) for typ, atomic in _PRIMITIVES)
+ATOMIC_TYPE_TO_PRIMITIVE = dict((atomic, typ) for typ, atomic in _PRIMITIVES)
+
+# One-way mappings
+PRIMITIVE_TO_ATOMIC_TYPE.update({
+    # In python 2, this is a no-op because we define it as the bi-directional
+    # mapping above. This just ensures the one-way mapping is defined in python
+    # 3.
+    ByteString: schema_pb2.BYTES,
+    # Allow users to specify a native int, and use INT64 as the cross-language
+    # representation. Technically ints have unlimited precision, but RowCoder
+    # should throw an error if it sees one with a bit width > 64 when encoding.
+    int: schema_pb2.INT64,
+    float: schema_pb2.DOUBLE,
+})
+
+
+def typing_to_runner_api(type_):
+  if _match_is_named_tuple(type_):
+    schema = None
+    if hasattr(type_, 'id'):
+      schema = SCHEMA_REGISTRY.get_schema_by_id(type_.id)
+    if schema is None:
+      fields = [
+          schema_pb2.Field(
+              name=name, type=typing_to_runner_api(type_._field_types[name]))
+          for name in type_._fields
+      ]
+      type_id = str(uuid4())
+      schema = schema_pb2.Schema(fields=fields, id=type_id)
+      SCHEMA_REGISTRY.add(type_, schema)
+
+    return schema_pb2.FieldType(
+        row_type=schema_pb2.RowType(
+            schema=schema))
+
+  # All concrete types (other than NamedTuple sub-classes) should map to
+  # a supported primitive type.
+  elif type_ in PRIMITIVE_TO_ATOMIC_TYPE:
+    return schema_pb2.FieldType(atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[type_])
+
+  elif sys.version_info.major == 2 and type_ == str:
+    raise ValueError(
+        "type 'str' is not supported in python 2. Please use 'unicode' or "
+        "'typing.ByteString' instead to unambiguously indicate if this is a "
+        "UTF-8 string or a byte array."
+    )
+
+  elif _match_is_exactly_mapping(type_):
+    key_type, value_type = map(typing_to_runner_api, _get_args(type_))
+    return schema_pb2.FieldType(
+        map_type=schema_pb2.MapType(key_type=key_type, value_type=value_type))
+
+  elif _match_is_optional(type_):
+    # It's possible that a user passes us Optional[Optional[T]], but in python
+    # typing this is indistinguishable from Optional[T] - both resolve to
+    # Union[T, None] - so there's no need to check for that case here.
+    result = typing_to_runner_api(extract_optional_type(type_))
+    result.nullable = True
+    return result
+
+  elif _safe_issubclass(type_, Sequence):
+    element_type = typing_to_runner_api(_get_args(type_)[0])
+    return schema_pb2.FieldType(
+        array_type=schema_pb2.ArrayType(element_type=element_type))
+
+  raise ValueError("Unsupported type: %s" % type_)
+
+
+def typing_from_runner_api(fieldtype_proto):
+  if fieldtype_proto.nullable:
+    # In order to determine the inner type, create a copy of fieldtype_proto
+    # with nullable=False and pass back to typing_from_runner_api
+    base_type = schema_pb2.FieldType()
+    base_type.CopyFrom(fieldtype_proto)
+    base_type.nullable = False
+    return Optional[typing_from_runner_api(base_type)]
+
+  type_info = fieldtype_proto.WhichOneof("type_info")
+  if type_info == "atomic_type":
+    try:
+      return ATOMIC_TYPE_TO_PRIMITIVE[fieldtype_proto.atomic_type]
+    except KeyError:
+      raise ValueError("Unsupported atomic type: {0}".format(
+          fieldtype_proto.atomic_type))
+  elif type_info == "array_type":
+    return Sequence[typing_from_runner_api(
+        fieldtype_proto.array_type.element_type)]
+  elif type_info == "map_type":
+    return Mapping[
+        typing_from_runner_api(fieldtype_proto.map_type.key_type),
+        typing_from_runner_api(fieldtype_proto.map_type.value_type)
+    ]
+  elif type_info == "row_type":
+    schema = fieldtype_proto.row_type.schema
+    user_type = SCHEMA_REGISTRY.get_typing_by_id(schema.id)
+    if user_type is None:
+      from apache_beam import coders
+      type_name = 'BeamSchema_{}'.format(schema.id.replace('-', '_'))
+      user_type = NamedTuple(type_name,
+                             [(field.name, typing_from_runner_api(field.type))
+                              for field in schema.fields])
+      user_type.id = schema.id
+      SCHEMA_REGISTRY.add(user_type, schema)
+      coders.registry.register_coder(user_type, coders.RowCoder)
+    return user_type
+
+  elif type_info == "logical_type":
+    pass  # TODO
+
+
+def named_tuple_from_schema(schema):
+  return typing_from_runner_api(
+      schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema)))
+
+
+def named_tuple_to_schema(named_tuple):
+  return typing_to_runner_api(named_tuple).row_type.schema
diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py
new file mode 100644
index 0000000..9dd1bc2
--- /dev/null
+++ b/sdks/python/apache_beam/typehints/schemas_test.py
@@ -0,0 +1,270 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Tests for schemas."""
+
+from __future__ import absolute_import
+
+import itertools
+import sys
+import unittest
+from typing import ByteString
+from typing import List
+from typing import Mapping
+from typing import NamedTuple
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+from past.builtins import unicode
+
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import typing_from_runner_api
+from apache_beam.typehints.schemas import typing_to_runner_api
+
+IS_PYTHON_3 = sys.version_info.major > 2
+
+
+class SchemaTest(unittest.TestCase):
+  """ Tests for Runner API Schema proto to/from typing conversions
+
+  There are two main tests: test_typing_survives_proto_roundtrip, and
+  test_proto_survives_typing_roundtrip. These are both necessary because Schemas
+  are cached by ID, so performing just one of them wouldn't necessarily exercise
+  all code paths.
+  """
+
+  def test_typing_survives_proto_roundtrip(self):
+    all_nonoptional_primitives = [
+        np.int8,
+        np.int16,
+        np.int32,
+        np.int64,
+        np.float32,
+        np.float64,
+        unicode,
+        bool,
+    ]
+
+    # The bytes type cannot survive a roundtrip to/from proto in Python 2.
+    # In order to use BYTES a user type has to use typing.ByteString (because
+    # bytes == str, and we map str to STRING).
+    if IS_PYTHON_3:
+      all_nonoptional_primitives.extend([bytes])
+
+    all_optional_primitives = [
+        Optional[typ] for typ in all_nonoptional_primitives
+    ]
+
+    all_primitives = all_nonoptional_primitives + all_optional_primitives
+
+    basic_array_types = [Sequence[typ] for typ in all_primitives]
+
+    basic_map_types = [
+        Mapping[key_type,
+                value_type] for key_type, value_type in itertools.product(
+                    all_primitives, all_primitives)
+    ]
+
+    selected_schemas = [
+        NamedTuple(
+            'AllPrimitives',
+            [('field%d' % i, typ) for i, typ in enumerate(all_primitives)]),
+        NamedTuple('ComplexSchema', [
+            ('id', np.int64),
+            ('name', unicode),
+            ('optional_map', Optional[Mapping[unicode,
+                                              Optional[np.float64]]]),
+            ('optional_array', Optional[Sequence[np.float32]]),
+            ('array_optional', Sequence[Optional[bool]]),
+        ])
+    ]
+
+    test_cases = all_primitives + \
+                 basic_array_types + \
+                 basic_map_types + \
+                 selected_schemas
+
+    for test_case in test_cases:
+      self.assertEqual(test_case,
+                       typing_from_runner_api(typing_to_runner_api(test_case)))
+
+  def test_proto_survives_typing_roundtrip(self):
+    all_nonoptional_primitives = [
+        schema_pb2.FieldType(atomic_type=typ)
+        for typ in schema_pb2.AtomicType.values()
+        if typ is not schema_pb2.UNSPECIFIED
+    ]
+
+    # The bytes type cannot survive a roundtrip to/from proto in Python 2.
+    # In order to use BYTES a user type has to use typing.ByteString (because
+    # bytes == str, and we map str to STRING).
+    if not IS_PYTHON_3:
+      all_nonoptional_primitives.remove(
+          schema_pb2.FieldType(atomic_type=schema_pb2.BYTES))
+
+    all_optional_primitives = [
+        schema_pb2.FieldType(nullable=True, atomic_type=typ)
+        for typ in schema_pb2.AtomicType.values()
+        if typ is not schema_pb2.UNSPECIFIED
+    ]
+
+    all_primitives = all_nonoptional_primitives + all_optional_primitives
+
+    basic_array_types = [
+        schema_pb2.FieldType(array_type=schema_pb2.ArrayType(element_type=typ))
+        for typ in all_primitives
+    ]
+
+    basic_map_types = [
+        schema_pb2.FieldType(
+            map_type=schema_pb2.MapType(
+                key_type=key_type, value_type=value_type)) for key_type,
+        value_type in itertools.product(all_primitives, all_primitives)
+    ]
+
+    selected_schemas = [
+        schema_pb2.FieldType(
+            row_type=schema_pb2.RowType(
+                schema=schema_pb2.Schema(
+                    id='32497414-85e8-46b7-9c90-9a9cc62fe390',
+                    fields=[
+                        schema_pb2.Field(name='field%d' % i, type=typ)
+                        for i, typ in enumerate(all_primitives)
+                    ]))),
+        schema_pb2.FieldType(
+            row_type=schema_pb2.RowType(
+                schema=schema_pb2.Schema(
+                    id='dead1637-3204-4bcb-acf8-99675f338600',
+                    fields=[
+                        schema_pb2.Field(
+                            name='id',
+                            type=schema_pb2.FieldType(
+                                atomic_type=schema_pb2.INT64)),
+                        schema_pb2.Field(
+                            name='name',
+                            type=schema_pb2.FieldType(
+                                atomic_type=schema_pb2.STRING)),
+                        schema_pb2.Field(
+                            name='optional_map',
+                            type=schema_pb2.FieldType(
+                                nullable=True,
+                                map_type=schema_pb2.MapType(
+                                    key_type=schema_pb2.FieldType(
+                                        atomic_type=schema_pb2.STRING
+                                    ),
+                                    value_type=schema_pb2.FieldType(
+                                        atomic_type=schema_pb2.DOUBLE
+                                    )))),
+                        schema_pb2.Field(
+                            name='optional_array',
+                            type=schema_pb2.FieldType(
+                                nullable=True,
+                                array_type=schema_pb2.ArrayType(
+                                    element_type=schema_pb2.FieldType(
+                                        atomic_type=schema_pb2.FLOAT)
+                                ))),
+                        schema_pb2.Field(
+                            name='array_optional',
+                            type=schema_pb2.FieldType(
+                                array_type=schema_pb2.ArrayType(
+                                    element_type=schema_pb2.FieldType(
+                                        nullable=True,
+                                        atomic_type=schema_pb2.BYTES)
+                                ))),
+                    ]))),
+    ]
+
+    test_cases = all_primitives + \
+                 basic_array_types + \
+                 basic_map_types + \
+                 selected_schemas
+
+    for test_case in test_cases:
+      self.assertEqual(test_case,
+                       typing_to_runner_api(typing_from_runner_api(test_case)))
+
+  def test_unknown_primitive_raise_valueerror(self):
+    self.assertRaises(ValueError, lambda: typing_to_runner_api(np.uint32))
+
+  def test_unknown_atomic_raise_valueerror(self):
+    self.assertRaises(
+        ValueError, lambda: typing_from_runner_api(
+            schema_pb2.FieldType(atomic_type=schema_pb2.UNSPECIFIED))
+    )
+
+  @unittest.skipIf(IS_PYTHON_3, 'str is acceptable in python 3')
+  def test_str_raises_error_py2(self):
+    self.assertRaises(lambda: typing_to_runner_api(str))
+    self.assertRaises(lambda: typing_to_runner_api(
+        NamedTuple('Test', [('int', int), ('str', str)])))
+
+  def test_int_maps_to_int64(self):
+    self.assertEqual(
+        schema_pb2.FieldType(atomic_type=schema_pb2.INT64),
+        typing_to_runner_api(int))
+
+  def test_float_maps_to_float64(self):
+    self.assertEqual(
+        schema_pb2.FieldType(atomic_type=schema_pb2.DOUBLE),
+        typing_to_runner_api(float))
+
+  def test_trivial_example(self):
+    MyCuteClass = NamedTuple('MyCuteClass', [
+        ('name', unicode),
+        ('age', Optional[int]),
+        ('interests', List[unicode]),
+        ('height', float),
+        ('blob', ByteString),
+    ])
+
+    expected = schema_pb2.FieldType(
+        row_type=schema_pb2.RowType(
+            schema=schema_pb2.Schema(fields=[
+                schema_pb2.Field(
+                    name='name',
+                    type=schema_pb2.FieldType(
+                        atomic_type=schema_pb2.STRING),
+                ),
+                schema_pb2.Field(
+                    name='age',
+                    type=schema_pb2.FieldType(
+                        nullable=True,
+                        atomic_type=schema_pb2.INT64)),
+                schema_pb2.Field(
+                    name='interests',
+                    type=schema_pb2.FieldType(
+                        array_type=schema_pb2.ArrayType(
+                            element_type=schema_pb2.FieldType(
+                                atomic_type=schema_pb2.STRING)))),
+                schema_pb2.Field(
+                    name='height',
+                    type=schema_pb2.FieldType(
+                        atomic_type=schema_pb2.DOUBLE)),
+                schema_pb2.Field(
+                    name='blob',
+                    type=schema_pb2.FieldType(
+                        atomic_type=schema_pb2.BYTES)),
+            ])))
+
+    # Only test that the fields are equal. If we attempt to test the entire type
+    # or the entire schema, the generated id will break equality.
+    self.assertEqual(expected.row_type.schema.fields,
+                     typing_to_runner_api(MyCuteClass).row_type.schema.fields)
+
+
+if __name__ == '__main__':
+  unittest.main()
diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh
index eab8aad..e3794ba 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -157,6 +157,7 @@
   'apache_beam.metrics.metric.MetricResults',
   'apache_beam.pipeline.PipelineVisitor',
   'apache_beam.pipeline.PTransformOverride',
+  'apache_beam.portability.api.schema_pb2.Schema',
   'apache_beam.pvalue.AsSideInput',
   'apache_beam.pvalue.DoOutputsTuple',
   'apache_beam.pvalue.PValue',
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 1cbc27f..d8d5c7d 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -116,6 +116,7 @@
     'hdfs>=2.1.0,<3.0.0',
     'httplib2>=0.8,<=0.12.0',
     'mock>=1.0.1,<3.0.0',
+    'numpy>=1.14.3,<2',
     'pymongo>=3.8.0,<4.0.0',
     'oauth2client>=2.0.1,<4',
     'protobuf>=3.5.0.post1,<4',
@@ -139,7 +140,6 @@
 REQUIRED_TEST_PACKAGES = [
     'nose>=1.3.7',
     'nose_xunitmp>=0.4.1',
-    'numpy>=1.14.3,<2',
     'pandas>=0.23.4,<0.25',
     'parameterized>=0.6.0,<0.7.0',
     'pyhamcrest>=1.9,<2.0',