Merge pull request #9999 from ibzib/spark-parallel-test
[BEAM-8294] run Spark portable validates runner tests in parallel
diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md
index 61fd817..bd876d3 100644
--- a/.test-infra/jenkins/README.md
+++ b/.test-infra/jenkins/README.md
@@ -27,17 +27,16 @@
| Name | Link | PR Trigger Phrase | Cron Status |
|------|------|-------------------|-------------|
-| beam_PreCommit_Go | [commit](https://builds.apache.org/job/beam_PreCommit_Go_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Go_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Go_Phrase/) | `Run Go PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron) |
-| beam_PreCommit_JavaPortabilityApi | [commit](), [cron](), [phrase]() | `Run JavaPortabilityApi PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go) |
+| beam_PreCommit_BeamSQL_ZetaSQL | [commit](https://builds.apache.org/job/beam_PreCommit_JavaBeamZetaSQL_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_JavaBeamZetaSQL_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_JavaBeamZetaSQL_Phrase/) | `Run BeamSQL_ZetaSQL PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_JavaBeamZetaSQL_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_JavaBeamZetaSQL_Cron) |
| beam_PreCommit_CommunityMetrics | [commit](https://builds.apache.org/job/beam_PreCommit_CommunityMetrics_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_CommunityMetrics_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_CommunityMetrics_Phrase/) | `Run CommunityMetrics PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_CommunityMetrics_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_CommunityMetrics_Cron) |
| beam_PreCommit_Go | [commit](https://builds.apache.org/job/beam_PreCommit_Go_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Go_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Go_Phrase/) | `Run Go PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron) |
| beam_PreCommit_Java | [commit](https://builds.apache.org/job/beam_PreCommit_Java_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Java_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Java_Phrase/) | `Run Java PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron) |
| beam_PreCommit_JavaPortabilityApi | [commit](https://builds.apache.org/job/beam_PreCommit_JavaPortabilityApi_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_JavaPortabilityApi_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_JavaPortabilityApi_Phrase/) | `Run JavaPortabilityApi PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_JavaPortabilityApi_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_JavaPortabilityApi_Cron) |
-| beam_PreCommit_Java_Examples_Dataflow | [commit](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Phrase/) | `Run Java PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron) |
-| beam_PreCommit_Portable_Python | [commit](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Phrase/) | `Run Portable PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron) |
+| beam_PreCommit_Java_Examples_Dataflow | [commit](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Phrase/) | `Run Java_Examples_Dataflow PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Examples_Dataflow_Cron) |
+| beam_PreCommit_Portable_Python | [commit](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Phrase/) | `Run Portable_Python PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron) |
| beam_PreCommit_PythonLint | [commit](https://builds.apache.org/job/beam_PreCommit_PythonLint_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_PythonLint_Phrase/) | `Run PythonLint PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron) |
| beam_PreCommit_Python | [commit](https://builds.apache.org/job/beam_PreCommit_Python_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Python_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Python_Phrase/) | `Run Python PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron) |
-| beam_PreCommit_Python2_PVR_Flink | [commit](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Phrase/) | `Run Python PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron) |
+| beam_PreCommit_Python2_PVR_Flink | [commit](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Phrase/) | `Run Python2_PVR_Flink PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron) |
| beam_PreCommit_RAT | [commit](https://builds.apache.org/job/beam_PreCommit_RAT_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_RAT_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_RAT_Phrase/) | `Run RAT PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_RAT_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_RAT_Cron) |
| beam_PreCommit_Spotless | [commit](https://builds.apache.org/job/beam_PreCommit_Spotless_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Spotless_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Spotless_Phrase/) | `Run Spotless PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Spotless_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Spotless_Cron) |
| beam_PreCommit_Website | [commit](https://builds.apache.org/job/beam_PreCommit_Website_Commit/), [cron](https://builds.apache.org/job/beam_PreCommit_Website_Cron/), [phrase](https://builds.apache.org/job/beam_PreCommit_Website_Phrase/) | `Run Website PreCommit` | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron) |
@@ -47,6 +46,7 @@
| Name | Link | PR Trigger Phrase | Cron Status |
|------|------|-------------------|-------------|
+| beam_PostCommit_CrossLanguageValidatesRunner | [cron](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/), [phrase](https://builds.apache.org/job/beam_PostCommit_XVR_Flink_PR/) | `Run XVR_Flink PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink) |
| beam_PostCommit_Go | [cron](https://builds.apache.org/job/beam_PostCommit_Go/), [phrase](https://builds.apache.org/job/beam_PostCommit_Go_PR/) | `Run Go PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go) |
| beam_PostCommit_Go_VR_Flink | [cron](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/), [phrase](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink_PR/) | `Run Go Flink ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/) |
| beam_PostCommit_Go_VR_Spark | [cron](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/), [phrase](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark_PR/) | `Run Go Spark ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/) |
@@ -59,19 +59,23 @@
| beam_PostCommit_Java_PVR_Flink_Streaming | [cron](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming_PR/) | `Run Java Flink PortableValidatesRunner Streaming` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming) |
| beam_PostCommit_Java_PVR_Spark_Batch | [cron](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch_PR/) | `Run Java Spark PortableValidatesRunner Batch` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch) |
| beam_PostCommit_Java_PortabilityApi | [cron](https://builds.apache.org/job/beam_PostCommit_Java_PortabilityApi/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_PortabilityApi_PR/) | `Run Java PortabilityApi PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PortabilityApi/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PortabilityApi) |
-| beam_PostCommit_Java_ValidatesRunner_Apex | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_PR/) | `Run Apex ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex) |
-| beam_PostCommit_Java_ValidatesRunner_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/) | `Run Dataflow ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow) |
+| beam_PostCommit_Java11_Dataflow_Examples | [cron](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow_PR/) | `Run Java examples on Dataflow with Java 11` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow) |
+| beam_PostCommit_Java11_Dataflow_Portability_Examples | [cron](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow_Portability/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow_Portability_PR/) | `Run Java Portability examples on Dataflow with Java 11` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow_Portability/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java11_Examples_Dataflow_Portability) |
| beam_PostCommit_Java11_ValidatesRunner_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Dataflow_PR/) | `Run Dataflow ValidatesRunner Java 11` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Dataflow) |
| beam_PostCommit_Java11_ValidatesRunner_PortabilityApi_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_PortabilityApi_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_PortabilityApi_Dataflow_PR/) | `Run Dataflow PortabilityApi ValidatesRunner with Java 11` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_PortabilityApi_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_PortabilityApi_Dataflow) |
| beam_PostCommit_Java11_ValidatesRunner_Direct | [cron](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Direct), [phrase](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Direct_PR) | `Run Direct ValidatesRunner in Java 11` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Direct/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java11_ValidatesRunner_Direct) |
+| beam_PostCommit_Java_ValidatesRunner_Apex | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex_PR/) | `Run Apex ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex) |
+| beam_PostCommit_Java_ValidatesRunner_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_PR/) | `Run Dataflow ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow) |
| beam_PostCommit_Java_ValidatesRunner_Flink | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_PR/) | `Run Flink ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink) |
| beam_PostCommit_Java_ValidatesRunner_Gearpump | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump_PR/) | `Run Gearpump ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump) |
| beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow_PR/) | `Run Dataflow PortabilityApi ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_PortabilityApi_Dataflow) |
| beam_PostCommit_Java_ValidatesRunner_Samza | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza_PR/) | `Run Samza ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza) |
| beam_PostCommit_Java_ValidatesRunner_Spark | [cron](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/), [phrase](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark_PR/) | `Run Spark ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark) |
+| beam_PostCommit_PortableJar_Flink | [cron](https://builds.apache.org/job/beam_PostCommit_PortableJar_Flink/), [phrase](https://builds.apache.org/job/beam_PostCommit_PortableJar_Flink_PR/) | `Run PortableJar_Flink PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_PortableJar_Flink/badge/icon)](https://builds.apache.org/job/beam_PostCommit_PortableJar_Flink) |
| beam_PostCommit_Py_VR_Dataflow | [cron](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/), [phrase](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_PR/) | `Run Python Dataflow ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow) |
| beam_PostCommit_Py_ValCont | [cron](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/), [phrase](https://builds.apache.org/job/beam_PostCommit_Py_ValCont_PR/) | `Run Python Dataflow ValidatesContainer` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont) |
| beam_PostCommit_Python35_VR_Flink | [cron](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/), [phrase](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/) | `Run Python 3.5 Flink ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink) |
+| beam_PostCommit_Python_MongoDBIO_IT | [cron](https://builds.apache.org/job/beam_PostCommit_Python_MongoDBIO_IT), [phrase](https://builds.apache.org/job/beam_PostCommit_Python_MongoDBIO_IT_PR/) | `Run Python MongoDBIO_IT` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_MongoDBIO_IT/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_MongoDBIO_IT) |
| beam_PostCommit_Python_VR_Spark | [cron](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/), [phrase](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/) | `Run Python Spark ValidatesRunner` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark) |
| beam_PostCommit_Python2 | [cron](https://builds.apache.org/job/beam_PostCommit_Python2), [phrase](https://builds.apache.org/job/beam_PostCommit_Python2_PR/) | `Run Python 2 PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2) |
| beam_PostCommit_Python35 | [cron](https://builds.apache.org/job/beam_PostCommit_Python35), [phrase](https://builds.apache.org/job/beam_PostCommit_Python35_PR/) | `Run Python 3.5 PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35) |
@@ -79,6 +83,7 @@
| beam_PostCommit_Python37 | [cron](https://builds.apache.org/job/beam_PostCommit_Python37), [phrase](https://builds.apache.org/job/beam_PostCommit_Python37_PR/) | `Run Python 3.7 PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37) |
| beam_PostCommit_SQL | [cron](https://builds.apache.org/job/beam_PostCommit_SQL/), [phrase](https://builds.apache.org/job/beam_PostCommit_SQL_PR/) | `Run SQL PostCommit` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_SQL/badge/icon)](https://builds.apache.org/job/beam_PostCommit_SQL) |
| beam_PostCommit_Website_Publish | [cron](https://builds.apache.org/job/beam_PostCommit_Website_Publish/) | N/A | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Website_Publish/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Website_Publish) |
+| beam_PostCommit_Website_Test | [cron](https://builds.apache.org/job/beam_PostCommit_Website_Test/) | `Run Full Website Test` | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Website_Test/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Website_Test) |
### Performance Jobs
@@ -89,10 +94,12 @@
| beam_PerformanceTests_Compressed_TextIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT/), [hdfs_cron](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT_HDFS/) | `Run Java CompressedTextIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT_HDFS/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Compressed_TextIOIT_HDFS) |
| beam_PerformanceTests_HadoopFormat | [cron](https://builds.apache.org/job/beam_PerformanceTests_HadoopFormat/) | `Run Java HadoopFormatIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_HadoopFormat/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_HadoopFormat) |
| beam_PerformanceTests_JDBC | [cron](https://builds.apache.org/job/beam_PerformanceTests_JDBC/) | `Run Java JdbcIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_JDBC/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_JDBC) |
+| beam_PerformanceTests_KafkaIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_Kafka_IO/) | `Run Java KafkaIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Kafka_IO/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Kafka_IO) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Kafka_IO/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Kafka_IO) |
| beam_PerformanceTests_ManyFiles_TextIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT/), [hdfs_cron](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT_HDFS/) | `Run Java ManyFilesTextIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT_HDFS/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_ManyFiles_TextIOIT_HDFS) |
+| beam_PerformanceTests_MongoDBIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_MongoDBIO_IT/) | `Run Java MongoDBIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_MongoDBIO_IT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_MongoDBIO_IT) |
| beam_PerformanceTests_ParquetIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT/), [hdfs_cron](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS/) | `Run Java ParquetIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS) |
| beam_PerformanceTests_Spark | [cron](https://builds.apache.org/job/beam_PerformanceTests_Spark/) | `Run Spark Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_Spark/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_Spark) |
-| beam_PerformanceTests_TFRecordIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT/) | `Run Java JdbcIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT) |
+| beam_PerformanceTests_TFRecordIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT/) | `Run Java TFRecordIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_TFRecordIOIT) |
| beam_PerformanceTests_TextIOIT | [cron](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT/), [hdfs_cron](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/) | `Run Java TextIO Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT) [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS) |
| beam_PerformanceTests_WordCountIT_Py27 | [cron](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py27/) | `Run Python27 WordCountIT Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py27/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py27) |
| beam_PerformanceTests_WordCountIT_Py35 | [cron](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py35/) | `Run Python35 WordCountIT Performance Test` | [![Build Status](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py35/badge/icon)](https://builds.apache.org/job/beam_PerformanceTests_WordCountIT_Py35) |
@@ -104,24 +111,25 @@
| Name | Link | PR Trigger Phrase | Cron Status |
|------|------|-------------------|-------------|
-| beam_LoadTests_Python_coGBK_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch_PR/) | Run Load Tests Python CoGBK Flink Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/) |
-| beam_LoadTests_Java_CoGBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch_PR/) | Run Load Tests Java CoGBK Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/) |
-| beam_LoadTests_Java_CoGBK_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming_PR/) | Run Load Tests Java CoGBK Dataflow Streaming | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/) |
-| beam_LoadTests_Python_CoGBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch_PR/) | Run Load Tests Python CoGBK Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/) |
-| beam_LoadTests_Python_Combine_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch_PR/) | Run Load Tests Python Combine Flink Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/) |
-| beam_LoadTests_Java_Combine_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch_PR/) | Run Load Tests Java Combine Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/) |
-| beam_LoadTests_Java_Combine_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming_PR/) | Run Load Tests Java Combine Dataflow Streaming | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/) |
-| beam_LoadTests_Python_Combine_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch_PR/) | Run Python Load Tests Combine Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/) |
-| beam_LoadTests_Python_GBK_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch_PR/) | Run Load Tests Python GBK Flink Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/) |
-| beam_LoadTests_Java_GBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch_PR/) | Run Load Tests Java GBK Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/) |
-| beam_LoadTests_Java_GBK_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming_PR/) | Run Load Tests Java GBK Dataflow Streaming | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/) |
-| beam_LoadTests_Python_GBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch_PR/) | Run Load Tests Python GBK Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/) |
-| beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch_PR/) | Run Load Tests Python GBK reiterate Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/) |
-| beam_Java_LoadTests_Smoke | [phrase](https://builds.apache.org/job/beam_Java_LoadTests_Smoke_PR/) | Run Java Load Tests Smoke | |
-| beam_Python_LoadTests_Smoke | [phrase](https://builds.apache.org/job/beam_Python_LoadTests_Smoke_PR/) | Run Python Load Tests Smoke | |
-| beam_LoadTests_Java_ParDo_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch_PR/) | Run Load Tests Java ParDo Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/) |
-| beam_LoadTests_Java_ParDo_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming_PR/) | Run Load Tests Java ParDo Dataflow Streaming | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/) |
-| beam_LoadTests_Python_ParDo_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch_PR/) | Run Python Load Tests ParDo Dataflow Batch | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/) |
+| beam_LoadTests_Python_coGBK_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch_PR/) | `Run Load Tests Python CoGBK Flink Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_coGBK_Flink_Batch/) |
+| beam_LoadTests_Java_CoGBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch_PR/) | `Run Load Tests Java CoGBK Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Batch/) |
+| beam_LoadTests_Java_CoGBK_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming_PR/) | `Run Load Tests Java CoGBK Dataflow Streaming` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_CoGBK_Dataflow_Streaming/) |
+| beam_LoadTests_Python_CoGBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch_PR/) | `Run Load Tests Python CoGBK Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_CoGBK_Dataflow_Batch/) |
+| beam_LoadTests_Python_Combine_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch_PR/) | `Run Load Tests Python Combine Flink Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Flink_Batch/) |
+| beam_LoadTests_Java_Combine_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch_PR/) | `Run Load Tests Java Combine Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Batch/) |
+| beam_LoadTests_Java_Combine_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming_PR/) | `Run Load Tests Java Combine Dataflow Streaming` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_Combine_Dataflow_Streaming/) |
+| beam_LoadTests_Python_Combine_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch_PR/) | `Run Python Load Tests Combine Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_Combine_Dataflow_Batch/) |
+| beam_LoadTests_Python_GBK_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch_PR/) | `Run Load Tests Python GBK Flink Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Flink_Batch/) |
+| beam_LoadTests_Java_GBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch_PR/) | `Run Load Tests Java GBK Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Batch/) |
+| beam_LoadTests_Java_GBK_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming_PR/) | `Run Load Tests Java GBK Dataflow Streaming` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_GBK_Dataflow_Streaming/) |
+| beam_LoadTests_Python_GBK_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch_PR/) | `Run Load Tests Python GBK Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_Dataflow_Batch/) |
+| beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch_PR/) | `Run Load Tests Python GBK reiterate Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_GBK_reiterate_Dataflow_Batch/) |
+| beam_Java_LoadTests_Smoke | [phrase](https://builds.apache.org/job/beam_Java_LoadTests_Smoke_PR/) | `Run Java Load Tests Smoke` | |
+| beam_Python_LoadTests_Smoke | [phrase](https://builds.apache.org/job/beam_Python_LoadTests_Smoke_PR/) | `Run Python Load Tests Smoke` | |
+| beam_LoadTests_Java_ParDo_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch_PR/) | `Run Load Tests Java ParDo Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Batch/) |
+| beam_LoadTests_Java_ParDo_Dataflow_Streaming | [cron](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/), [phrase](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming_PR/) | `Run Load Tests Java ParDo Dataflow Streaming` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Java_ParDo_Dataflow_Streaming/) |
+| beam_LoadTests_Python_ParDo_Dataflow_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch_PR/) | `Run Python Load Tests ParDo Dataflow Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Dataflow_Batch/) |
+| beam_LoadTests_Python_ParDo_Flink_Batch | [cron](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Flink_Batch/), [phrase](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Flink_Batch_PR/) | `Run Python Load Tests ParDo Flink Batch` | [![Build Status](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Flink_Batch/badge/icon)](https://builds.apache.org/job/beam_LoadTests_Python_ParDo_Flink_Batch/) |
### Inventory Jobs
@@ -164,4 +172,4 @@
retest this please
```
-* Last update (mm/dd/yyyy): 02/12/2019
+* Last update (mm/dd/yyyy): 11/06/2019
diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
index ecae656..9de15ac 100644
--- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
+++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml
@@ -97,6 +97,7 @@
"\u000A": 10
"\u00c8\u0001": 200
"\u00e8\u0007": 1000
+ "\u00a9\u0046": 9001
"\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u00ff\u0001": -1
---
@@ -275,3 +276,25 @@
"\u007f\u00f0\0\0\0\0\0\0": "Infinity"
"\u00ff\u00f0\0\0\0\0\0\0": "-Infinity"
"\u007f\u00f8\0\0\0\0\0\0": "NaN"
+
+---
+
+coder:
+ urn: "beam:coder:row:v1"
+ # str: string, i32: int32, f64: float64, arr: array[string]
+ payload: "\n\t\n\x03str\x1a\x02\x10\x07\n\t\n\x03i32\x1a\x02\x10\x03\n\t\n\x03f64\x1a\x02\x10\x06\n\r\n\x03arr\x1a\x06\x1a\x04\n\x02\x10\x07\x12$4e5e554c-d4c1-4a5d-b5e1-f3293a6b9f05"
+nested: false
+examples:
+ "\u0004\u0000\u0003foo\u00a9\u0046\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a\0\0\0\u0003\u0003foo\u0003bar\u0003baz": {str: "foo", i32: 9001, f64: "0.1", arr: ["foo", "bar", "baz"]}
+
+---
+
+coder:
+ urn: "beam:coder:row:v1"
+ # str: nullable string, i32: nullable int32, f64: nullable float64
+ payload: "\n\x0b\n\x03str\x1a\x04\x08\x01\x10\x07\n\x0b\n\x03i32\x1a\x04\x08\x01\x10\x03\n\x0b\n\x03f64\x1a\x04\x08\x01\x10\x06\x12$b20c6545-57af-4bc8-b2a9-51ace21c7393"
+nested: false
+examples:
+ "\u0003\u0001\u0007": {str: null, i32: null, f64: null}
+ "\u0003\u0001\u0004\u0003foo\u00a9\u0046": {str: "foo", i32: 9001, f64: null}
+ "\u0003\u0000\u0003foo\u00a9\u0046\u003f\u00b9\u0099\u0099\u0099\u0099\u0099\u009a": {str: "foo", i32: 9001, f64: "0.1"}
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index ec05ef0..90f52fc 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -645,6 +645,50 @@
// Components: Coder for a single element.
// Experimental.
STATE_BACKED_ITERABLE = 9 [(beam_urn) = "beam:coder:state_backed_iterable:v1"];
+
+ // Additional Standard Coders
+ // --------------------------
+ // The following coders are not required to be implemented for an SDK or
+ // runner to support the Beam model, but enable users to take advantage of
+ // schema-aware transforms.
+
+ // Encodes a "row", an element with a known schema, defined by an
+ // instance of Schema from schema.proto.
+ //
+ // A row is encoded as the concatenation of:
+ // - The number of attributes in the schema, encoded with
+ // beam:coder:varint:v1. This makes it possible to detect certain
+ // allowed schema changes (appending or removing columns) in
+ // long-running streaming pipelines.
+ // - A byte array representing a packed bitset indicating null fields (a
+ // 1 indicating a null) encoded with beam:coder:bytes:v1. The unused
+ // bits in the last byte must be set to 0. If there are no nulls an
+ // empty byte array is encoded.
+ // The two-byte bitset (not including the lenghth-prefix) for the row
+ // [NULL, 0, 0, 0, NULL, 0, 0, NULL, 0, NULL] would be
+ // [0b10010001, 0b00000010]
+ // - An encoding for each non-null field, concatenated together.
+ //
+ // Schema types are mapped to coders as follows:
+ // AtomicType:
+ // BYTE: not yet a standard coder (BEAM-7996)
+ // INT16: not yet a standard coder (BEAM-7996)
+ // INT32: beam:coder:varint:v1
+ // INT64: beam:coder:varint:v1
+ // FLOAT: not yet a standard coder (BEAM-7996)
+ // DOUBLE: beam:coder:double:v1
+ // STRING: beam:coder:string_utf8:v1
+ // BOOLEAN: beam:coder:bool:v1
+ // BYTES: beam:coder:bytes:v1
+ // ArrayType: beam:coder:iterable:v1 (always has a known length)
+ // MapType: not yet a standard coder (BEAM-7996)
+ // RowType: beam:coder:row:v1
+ // LogicalType: Uses the coder for its representation.
+ //
+ // The payload for RowCoder is an instance of Schema.
+ // Components: None
+ // Experimental.
+ ROW = 13 [(beam_urn) = "beam:coder:row:v1"];
}
}
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
index 9c4e232..f2cc8fa 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/CoderTranslators.java
@@ -17,16 +17,22 @@
*/
package org.apache.beam.runners.core.construction;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+
import java.util.Collections;
import java.util.List;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
/** {@link CoderTranslator} implementations for known coder types. */
@@ -118,6 +124,33 @@
};
}
+ static CoderTranslator<RowCoder> row() {
+ return new CoderTranslator<RowCoder>() {
+ @Override
+ public List<? extends Coder<?>> getComponents(RowCoder from) {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public byte[] getPayload(RowCoder from) {
+ return SchemaTranslation.schemaToProto(from.getSchema()).toByteArray();
+ }
+
+ @Override
+ public RowCoder fromComponents(List<Coder<?>> components, byte[] payload) {
+ checkArgument(
+ components.isEmpty(), "Expected empty component list, but received: " + components);
+ Schema schema;
+ try {
+ schema = SchemaTranslation.fromProto(SchemaApi.Schema.parseFrom(payload));
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Unable to parse schema for RowCoder: ", e);
+ }
+ return RowCoder.of(schema);
+ }
+ };
+ }
+
public abstract static class SimpleStructuredCoderTranslator<T extends Coder<?>>
implements CoderTranslator<T> {
@Override
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
index 8294fe0..854f523 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoderRegistrar.java
@@ -29,6 +29,7 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -60,6 +61,7 @@
.put(GlobalWindow.Coder.class, ModelCoders.GLOBAL_WINDOW_CODER_URN)
.put(FullWindowedValueCoder.class, ModelCoders.WINDOWED_VALUE_CODER_URN)
.put(DoubleCoder.class, ModelCoders.DOUBLE_CODER_URN)
+ .put(RowCoder.class, ModelCoders.ROW_CODER_URN)
.build();
public static final Set<String> WELL_KNOWN_CODER_URNS = BEAM_MODEL_CODER_URNS.values();
@@ -79,6 +81,7 @@
.put(LengthPrefixCoder.class, CoderTranslators.lengthPrefix())
.put(FullWindowedValueCoder.class, CoderTranslators.fullWindowedValue())
.put(DoubleCoder.class, CoderTranslators.atomic(DoubleCoder.class))
+ .put(RowCoder.class, CoderTranslators.row())
.build();
static {
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
index 8d1265c..486e39c 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ModelCoders.java
@@ -54,6 +54,8 @@
public static final String WINDOWED_VALUE_CODER_URN = getUrn(StandardCoders.Enum.WINDOWED_VALUE);
+ public static final String ROW_CODER_URN = getUrn(StandardCoders.Enum.ROW);
+
private static final Set<String> MODEL_CODER_URNS =
ImmutableSet.of(
BYTES_CODER_URN,
@@ -67,7 +69,8 @@
GLOBAL_WINDOW_CODER_URN,
INTERVAL_WINDOW_CODER_URN,
WINDOWED_VALUE_CODER_URN,
- DOUBLE_CODER_URN);
+ DOUBLE_CODER_URN,
+ ROW_CODER_URN);
public static Set<String> urns() {
return MODEL_CODER_URNS;
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
index dc28b79..a6368aa 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CoderTranslationTest.java
@@ -41,10 +41,14 @@
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.LengthPrefixCoder;
+import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.LogicalTypes;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
@@ -60,8 +64,8 @@
/** Tests for {@link CoderTranslation}. */
public class CoderTranslationTest {
- private static final Set<StructuredCoder<?>> KNOWN_CODERS =
- ImmutableSet.<StructuredCoder<?>>builder()
+ private static final Set<Coder<?>> KNOWN_CODERS =
+ ImmutableSet.<Coder<?>>builder()
.add(ByteArrayCoder.of())
.add(BooleanCoder.of())
.add(KvCoder.of(VarLongCoder.of(), VarLongCoder.of()))
@@ -76,6 +80,13 @@
FullWindowedValueCoder.of(
IterableCoder.of(VarLongCoder.of()), IntervalWindowCoder.of()))
.add(DoubleCoder.of())
+ .add(
+ RowCoder.of(
+ Schema.of(
+ Field.of("i16", FieldType.INT16),
+ Field.of("array", FieldType.array(FieldType.STRING)),
+ Field.of("map", FieldType.map(FieldType.STRING, FieldType.INT32)),
+ Field.of("bar", FieldType.logicalType(LogicalTypes.FixedBytes.of(123))))))
.build();
/**
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
index 1cedd5d..52dddcc 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CommonCoderTest.java
@@ -20,6 +20,8 @@
import static org.apache.beam.runners.core.construction.BeamUrns.getUrn;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList.toImmutableList;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap.toImmutableMap;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
@@ -46,8 +48,8 @@
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi.StandardCoders;
+import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.coders.BooleanCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.ByteCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.Context;
@@ -55,8 +57,10 @@
import org.apache.beam.sdk.coders.DoubleCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -65,6 +69,8 @@
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
@@ -99,6 +105,7 @@
.put(
getUrn(StandardCoders.Enum.WINDOWED_VALUE),
WindowedValue.FullWindowedValueCoder.class)
+ .put(getUrn(StandardCoders.Enum.ROW), RowCoder.class)
.build();
@AutoValue
@@ -107,16 +114,21 @@
abstract List<CommonCoder> getComponents();
+ @SuppressWarnings("mutable")
+ abstract byte[] getPayload();
+
abstract Boolean getNonDeterministic();
@JsonCreator
static CommonCoder create(
@JsonProperty("urn") String urn,
@JsonProperty("components") @Nullable List<CommonCoder> components,
+ @JsonProperty("payload") @Nullable String payload,
@JsonProperty("non_deterministic") @Nullable Boolean nonDeterministic) {
return new AutoValue_CommonCoderTest_CommonCoder(
checkNotNull(urn, "urn"),
firstNonNull(components, Collections.emptyList()),
+ firstNonNull(payload, "").getBytes(StandardCharsets.ISO_8859_1),
firstNonNull(nonDeterministic, Boolean.FALSE));
}
}
@@ -282,43 +294,90 @@
return WindowedValue.of(windowValue, timestamp, windows, paneInfo);
} else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) {
return Double.parseDouble((String) value);
+ } else if (s.equals(getUrn(StandardCoders.Enum.ROW))) {
+ Schema schema;
+ try {
+ schema = SchemaTranslation.fromProto(SchemaApi.Schema.parseFrom(coderSpec.getPayload()));
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException("Failed to parse schema payload for row coder", e);
+ }
+
+ return parseField(value, Schema.FieldType.row(schema));
} else {
throw new IllegalStateException("Unknown coder URN: " + coderSpec.getUrn());
}
}
+ private static Object parseField(Object value, Schema.FieldType fieldType) {
+ switch (fieldType.getTypeName()) {
+ case BYTE:
+ return ((Number) value).byteValue();
+ case INT16:
+ return ((Number) value).shortValue();
+ case INT32:
+ return ((Number) value).intValue();
+ case INT64:
+ return ((Number) value).longValue();
+ case FLOAT:
+ return Float.parseFloat((String) value);
+ case DOUBLE:
+ return Double.parseDouble((String) value);
+ case STRING:
+ return (String) value;
+ case BOOLEAN:
+ return (Boolean) value;
+ case BYTES:
+ // extract String as byte[]
+ return ((String) value).getBytes(StandardCharsets.ISO_8859_1);
+ case ARRAY:
+ return ((List<Object>) value)
+ .stream()
+ .map((element) -> parseField(element, fieldType.getCollectionElementType()))
+ .collect(toImmutableList());
+ case MAP:
+ Map<Object, Object> kvMap = (Map<Object, Object>) value;
+ return kvMap.entrySet().stream()
+ .collect(
+ toImmutableMap(
+ (pair) -> parseField(pair.getKey(), fieldType.getMapKeyType()),
+ (pair) -> parseField(pair.getValue(), fieldType.getMapValueType())));
+ case ROW:
+ Map<String, Object> rowMap = (Map<String, Object>) value;
+ Schema schema = fieldType.getRowSchema();
+ Row.Builder row = Row.withSchema(schema);
+ for (Schema.Field field : schema.getFields()) {
+ Object element = rowMap.remove(field.getName());
+ if (element != null) {
+ element = parseField(element, field.getType());
+ }
+ row.addValue(element);
+ }
+
+ if (!rowMap.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Value contains keys that are not in the schema: " + rowMap.keySet());
+ }
+
+ return row.build();
+ default: // DECIMAL, DATETIME, LOGICAL_TYPE
+ throw new IllegalArgumentException("Unsupported type name: " + fieldType.getTypeName());
+ }
+ }
+
private static Coder<?> instantiateCoder(CommonCoder coder) {
List<Coder<?>> components = new ArrayList<>();
for (CommonCoder innerCoder : coder.getComponents()) {
components.add(instantiateCoder(innerCoder));
}
- String s = coder.getUrn();
- if (s.equals(getUrn(StandardCoders.Enum.BYTES))) {
- return ByteArrayCoder.of();
- } else if (s.equals(getUrn(StandardCoders.Enum.BOOL))) {
- return BooleanCoder.of();
- } else if (s.equals(getUrn(StandardCoders.Enum.STRING_UTF8))) {
- return StringUtf8Coder.of();
- } else if (s.equals(getUrn(StandardCoders.Enum.KV))) {
- return KvCoder.of(components.get(0), components.get(1));
- } else if (s.equals(getUrn(StandardCoders.Enum.VARINT))) {
- return VarLongCoder.of();
- } else if (s.equals(getUrn(StandardCoders.Enum.INTERVAL_WINDOW))) {
- return IntervalWindowCoder.of();
- } else if (s.equals(getUrn(StandardCoders.Enum.ITERABLE))) {
- return IterableCoder.of(components.get(0));
- } else if (s.equals(getUrn(StandardCoders.Enum.TIMER))) {
- return Timer.Coder.of(components.get(0));
- } else if (s.equals(getUrn(StandardCoders.Enum.GLOBAL_WINDOW))) {
- return GlobalWindow.Coder.INSTANCE;
- } else if (s.equals(getUrn(StandardCoders.Enum.WINDOWED_VALUE))) {
- return WindowedValue.FullWindowedValueCoder.of(
- components.get(0), (Coder<BoundedWindow>) components.get(1));
- } else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) {
- return DoubleCoder.of();
- } else {
- throw new IllegalStateException("Unknown coder URN: " + coder.getUrn());
- }
+ Class<? extends Coder> coderType =
+ ModelCoderRegistrar.BEAM_MODEL_CODER_URNS.inverse().get(coder.getUrn());
+ checkNotNull(coderType, "Unknown coder URN: " + coder.getUrn());
+
+ CoderTranslator<?> translator = ModelCoderRegistrar.BEAM_MODEL_CODERS.get(coderType);
+ checkNotNull(
+ translator, "No translator found for common coder class: " + coderType.getSimpleName());
+
+ return translator.fromComponents(components, coder.getPayload());
}
@Test
@@ -381,6 +440,8 @@
} else if (s.equals(getUrn(StandardCoders.Enum.DOUBLE))) {
assertEquals(expectedValue, actualValue);
+ } else if (s.equals(getUrn(StandardCoders.Enum.ROW))) {
+ assertEquals(expectedValue, actualValue);
} else {
throw new IllegalStateException("Unknown coder URN: " + coder.getUrn());
}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index 0ef7260..1224774 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -181,6 +181,9 @@
// Encoding: false = 0, true = 1
b := make([]byte, 1, 1)
if err := ioutilx.ReadNBufUnsafe(r, b); err != nil {
+ if err == io.EOF {
+ return nil, err
+ }
return nil, fmt.Errorf("error decoding bool: %v", err)
}
switch b[0] {
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go b/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
index 4f5d0f1..f366f4d 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/clone_test.go
@@ -16,10 +16,11 @@
package pipelinex
import (
- "reflect"
"testing"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+ "github.com/golang/protobuf/proto"
+ "github.com/google/go-cmp/cmp"
)
func TestShallowClonePTransform(t *testing.T) {
@@ -34,7 +35,7 @@
for _, test := range tests {
actual := ShallowClonePTransform(test)
- if !reflect.DeepEqual(actual, test) {
+ if !cmp.Equal(actual, test, cmp.Comparer(proto.Equal)) {
t.Errorf("ShallowClonePCollection(%v) = %v, want id", test, actual)
}
}
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
index bb814cd..ae32ffc 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/replace_test.go
@@ -16,10 +16,11 @@
package pipelinex
import (
- "reflect"
"testing"
pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+ "github.com/golang/protobuf/proto"
+ "github.com/google/go-cmp/cmp"
)
func TestEnsureUniqueName(t *testing.T) {
@@ -54,7 +55,7 @@
for _, test := range tests {
actual := ensureUniqueNames(test.in)
- if !reflect.DeepEqual(actual, test.exp) {
+ if !cmp.Equal(actual, test.exp, cmp.Comparer(proto.Equal)) {
t.Errorf("ensureUniqueName(%v) = %v, want %v", test.in, actual, test.exp)
}
}
@@ -112,7 +113,7 @@
for _, test := range tests {
actual := computeCompositeInputOutput(test.in)
- if !reflect.DeepEqual(actual, test.exp) {
+ if !cmp.Equal(actual, test.exp, cmp.Comparer(proto.Equal)) {
t.Errorf("coimputeInputOutput(%v) = %v, want %v", test.in, actual, test.exp)
}
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
index 4a3f11d..16e427f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java
@@ -276,7 +276,7 @@
String specNonWildcardPrefix = getNonWildcardPrefix(spec);
File file = new File(specNonWildcardPrefix);
return specNonWildcardPrefix.endsWith(File.separator)
- ? file
+ ? file.getAbsoluteFile()
: file.getAbsoluteFile().getParentFile();
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 4aa8dbf..03b6263 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -824,11 +824,11 @@
* href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into multiple parts to
* be processed in parallel.
*
- * <p>Signature: {@code List<RestrictionT> splitRestriction( InputT element, RestrictionT
- * restriction);}
+ * <p>Signature: {@code void splitRestriction(InputT element, RestrictionT restriction,
+ * OutputReceiver<RestrictionT> receiver);}
*
* <p>Optional: if this method is omitted, the restriction will not be split (equivalent to
- * defining the method and returning {@code Collections.singletonList(restriction)}).
+ * defining the method and outputting the {@code restriction} unchanged).
*/
// TODO: Make the InputT parameter optional.
@Documented
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
index 4100bff..de41818 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/LocalFileSystemTest.java
@@ -210,6 +210,22 @@
}
@Test
+ public void testMatchRelativeWildcardPath() throws Exception {
+ File baseFolder = temporaryFolder.newFolder("A");
+ File expectedFile1 = new File(baseFolder, "file1");
+
+ expectedFile1.createNewFile();
+
+ List<String> expected = ImmutableList.of(expectedFile1.getAbsolutePath());
+
+ System.setProperty("user.dir", temporaryFolder.getRoot().toString());
+ List<MatchResult> matchResults = localFileSystem.match(ImmutableList.of("A/*"));
+ assertThat(
+ toFilenames(matchResults),
+ containsInAnyOrder(expected.toArray(new String[expected.size()])));
+ }
+
+ @Test
public void testMatchExact() throws Exception {
List<String> expected = ImmutableList.of(temporaryFolder.newFile("a").toString());
temporaryFolder.newFile("aa");
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
index 0685644..5f71f86 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoLifecycleTest.java
@@ -29,6 +29,10 @@
import static org.junit.Assert.fail;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
@@ -36,6 +40,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
@@ -47,6 +52,7 @@
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -172,7 +178,7 @@
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- validate();
+ validate(CallState.SETUP, CallState.TEARDOWN);
}
}
@@ -185,7 +191,7 @@
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- validate();
+ validate(CallState.SETUP, CallState.START_BUNDLE, CallState.TEARDOWN);
}
}
@@ -198,7 +204,8 @@
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- validate();
+ validate(
+ CallState.SETUP, CallState.START_BUNDLE, CallState.PROCESS_ELEMENT, CallState.TEARDOWN);
}
}
@@ -211,7 +218,12 @@
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- validate();
+ validate(
+ CallState.SETUP,
+ CallState.START_BUNDLE,
+ CallState.PROCESS_ELEMENT,
+ CallState.FINISH_BUNDLE,
+ CallState.TEARDOWN);
}
}
@@ -224,7 +236,7 @@
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- validate();
+ validate(CallState.SETUP, CallState.TEARDOWN);
}
}
@@ -237,7 +249,7 @@
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- validate();
+ validate(CallState.SETUP, CallState.START_BUNDLE, CallState.TEARDOWN);
}
}
@@ -250,11 +262,30 @@
p.run();
fail("Pipeline should have failed with an exception");
} catch (Exception e) {
- validate();
+ validate(
+ CallState.SETUP, CallState.START_BUNDLE, CallState.PROCESS_ELEMENT, CallState.TEARDOWN);
}
}
- private void validate() {
+ @Test
+ @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesParDoLifecycle.class})
+ public void testTeardownCalledAfterExceptionInFinishBundleStateful() {
+ ExceptionThrowingFn fn = new ExceptionThrowingStatefulFn(MethodForException.FINISH_BUNDLE);
+ p.apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3))).apply(ParDo.of(fn));
+ try {
+ p.run();
+ fail("Pipeline should have failed with an exception");
+ } catch (Exception e) {
+ validate(
+ CallState.SETUP,
+ CallState.START_BUNDLE,
+ CallState.PROCESS_ELEMENT,
+ CallState.FINISH_BUNDLE,
+ CallState.TEARDOWN);
+ }
+ }
+
+ private void validate(CallState... requiredCallStates) {
assertThat(ExceptionThrowingFn.callStateMap, is(not(anEmptyMap())));
// assert that callStateMap contains only TEARDOWN as a value. Note: We do not expect
// teardown to be called on fn itself, but on any deserialized instance on which any other
@@ -267,19 +298,15 @@
"Function should have been torn down after exception",
value.finalState(),
is(CallState.TEARDOWN)));
- }
- @Test
- @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesParDoLifecycle.class})
- public void testTeardownCalledAfterExceptionInFinishBundleStateful() {
- ExceptionThrowingFn fn = new ExceptionThrowingStatefulFn(MethodForException.FINISH_BUNDLE);
- p.apply(Create.of(KV.of("a", 1), KV.of("b", 2), KV.of("a", 3))).apply(ParDo.of(fn));
- try {
- p.run();
- fail("Pipeline should have failed with an exception");
- } catch (Exception e) {
- validate();
- }
+ List<CallState> states = Arrays.stream(requiredCallStates).collect(Collectors.toList());
+ assertThat(
+ "At least one bundle should contain "
+ + states
+ + ", got "
+ + ExceptionThrowingFn.callStateMap.values(),
+ ExceptionThrowingFn.callStateMap.values().stream()
+ .anyMatch(tracker -> tracker.callStateVisited.equals(states)));
}
@Before
@@ -289,12 +316,15 @@
}
private static class DelayedCallStateTracker {
- private CountDownLatch latch;
- private AtomicReference<CallState> callState;
+ private final CountDownLatch latch;
+ private final AtomicReference<CallState> callState;
+ private final List<CallState> callStateVisited =
+ Collections.synchronizedList(new ArrayList<>());
private DelayedCallStateTracker(CallState setup) {
latch = new CountDownLatch(1);
callState = new AtomicReference<>(setup);
+ callStateVisited.add(setup);
}
DelayedCallStateTracker update(CallState val) {
@@ -306,13 +336,21 @@
if (CallState.TEARDOWN == val) {
latch.countDown();
}
-
+ synchronized (callStateVisited) {
+ if (!callStateVisited.contains(val)) {
+ callStateVisited.add(val);
+ }
+ }
return this;
}
@Override
public String toString() {
- return "DelayedCallStateTracker{" + "latch=" + latch + ", callState=" + callState + '}';
+ return MoreObjects.toStringHelper(this)
+ .add("latch", latch)
+ .add("callState", callState)
+ .add("callStateVisited", callStateVisited)
+ .toString();
}
CallState callState() {
@@ -377,9 +415,9 @@
@FinishBundle
public void postBundle() throws Exception {
assertThat(
- "processing bundle should have been called before finish bundle",
+ "processing bundle or start bundle should have been called before finish bundle",
getCallState(),
- is(CallState.PROCESS_ELEMENT));
+ anyOf(equalTo(CallState.PROCESS_ELEMENT), equalTo(CallState.START_BUNDLE)));
updateCallState(CallState.FINISH_BUNDLE);
throwIfNecessary(MethodForException.FINISH_BUNDLE);
}
@@ -416,8 +454,8 @@
return System.identityHashCode(this);
}
- private void updateCallState(CallState processElement) {
- callStateMap.get(id()).update(processElement);
+ private void updateCallState(CallState state) {
+ callStateMap.get(id()).update(state);
}
private CallState getCallState() {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
index 94db06d..267199b 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteTable.java
@@ -27,7 +27,6 @@
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.adapter.java.AbstractQueryableTable;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.QueryProvider;
@@ -103,8 +102,6 @@
context.getCluster().traitSetOf(BeamLogicalConvention.INSTANCE),
relOptTable,
beamTable,
- ImmutableList.of(),
- beamTable.constructFilter(ImmutableList.of()),
pipelineOptionsMap,
this);
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index 3cd6b55..962cc77 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -72,6 +72,7 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexBuilder;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexCall;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexLocalRef;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexNode;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rex.RexProgram;
@@ -244,13 +245,35 @@
@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
NodeStats inputStat = BeamSqlRelUtils.getNodeStats(this.input, mq);
- return BeamCostModel.FACTORY.makeCost(inputStat.getRowCount(), inputStat.getRate());
+ return BeamCostModel.FACTORY
+ .makeCost(inputStat.getRowCount(), inputStat.getRate())
+ // Increase cost by the small factor of the number of expressions involved in predicate.
+ // Helps favor Calcs with smaller filters.
+ .plus(
+ BeamCostModel.FACTORY
+ .makeTinyCost()
+ .multiplyBy(expressionsInFilter(getProgram().split().right)));
}
public boolean isInputSortRelAndLimitOnly() {
return (input instanceof BeamSortRel) && ((BeamSortRel) input).isLimitOnly();
}
+ /**
+ * Recursively count the number of expressions involved in conditions.
+ *
+ * @param filterNodes A list of conditions in a CNF.
+ * @return Number of expressions used by conditions.
+ */
+ private int expressionsInFilter(List<RexNode> filterNodes) {
+ int childSum =
+ filterNodes.stream()
+ .filter(n -> n instanceof RexCall)
+ .mapToInt(n -> expressionsInFilter(((RexCall) n).getOperands()))
+ .sum();
+ return filterNodes.size() + childSum;
+ }
+
/** {@code CalcFn} is the executor for a {@link BeamCalcRel} step. */
private static class CalcFn extends DoFn<Row, Row> {
private final String processElementBlock;
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
index b1d3f02..f672384 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamIOSourceRel.java
@@ -25,13 +25,9 @@
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
import org.apache.beam.sdk.extensions.sql.impl.planner.NodeStats;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
-import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
-import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
@@ -41,7 +37,6 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptTable;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.RelOptTableImpl;
-import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelWriter;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.TableScan;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
@@ -52,33 +47,26 @@
private final BeamSqlTable beamTable;
private final BeamCalciteTable calciteTable;
private final Map<String, String> pipelineOptions;
- private final List<String> usedFields;
- private final BeamSqlTableFilter tableFilters;
public BeamIOSourceRel(
RelOptCluster cluster,
RelTraitSet traitSet,
RelOptTable table,
BeamSqlTable beamTable,
- List<String> usedFields,
- BeamSqlTableFilter tableFilters,
Map<String, String> pipelineOptions,
BeamCalciteTable calciteTable) {
super(cluster, traitSet, table);
this.beamTable = beamTable;
- this.usedFields = usedFields;
- this.tableFilters = tableFilters;
this.calciteTable = calciteTable;
this.pipelineOptions = pipelineOptions;
}
- public BeamIOSourceRel copy(
+ public BeamPushDownIOSourceRel createPushDownRel(
RelDataType newType, List<String> usedFields, BeamSqlTableFilter tableFilters) {
RelOptTable relOptTable =
newType == null ? table : ((RelOptTableImpl) getTable()).copy(newType);
- tableFilters = tableFilters == null ? this.tableFilters : tableFilters;
- return new BeamIOSourceRel(
+ return new BeamPushDownIOSourceRel(
getCluster(),
traitSet,
relOptTable,
@@ -119,22 +107,6 @@
return new Transform();
}
- @Override
- public RelWriter explainTerms(RelWriter pw) {
- super.explainTerms(pw);
-
- // This is done to tell Calcite planner that BeamIOSourceRel cannot be simply substituted by
- // another BeamIOSourceRel, except for when they carry the same content.
- if (!usedFields.isEmpty()) {
- pw.item("usedFields", usedFields.toString());
- }
- if (!(tableFilters instanceof DefaultTableFilter)) {
- pw.item(tableFilters.getClass().getSimpleName(), tableFilters.toString());
- }
-
- return pw;
- }
-
private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
@Override
@@ -145,14 +117,7 @@
BeamIOSourceRel.class.getSimpleName(),
input);
- final PBegin begin = input.getPipeline().begin();
-
- if (usedFields.isEmpty() && tableFilters instanceof DefaultTableFilter) {
- return beamTable.buildIOReader(begin);
- }
-
- final Schema newBeamSchema = CalciteUtils.toSchema(getRowType());
- return beamTable.buildIOReader(begin, tableFilters, usedFields).setRowSchema(newBeamSchema);
+ return beamTable.buildIOReader(input.getPipeline().begin());
}
}
@@ -167,9 +132,7 @@
@Override
public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
NodeStats estimates = BeamSqlRelUtils.getNodeStats(this, mq);
- return BeamCostModel.FACTORY
- .makeCost(estimates.getRowCount(), estimates.getRate())
- .multiplyBy(getRowType().getFieldCount());
+ return BeamCostModel.FACTORY.makeCost(estimates.getRowCount(), estimates.getRate());
}
public BeamSqlTable getBeamSqlTable() {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamPushDownIOSourceRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamPushDownIOSourceRel.java
new file mode 100644
index 0000000..7c49acf
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamPushDownIOSourceRel.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.impl.rel;
+
+import static org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable;
+import org.apache.beam.sdk.extensions.sql.impl.planner.BeamCostModel;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptCluster;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptPlanner;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptTable;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelTraitSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.RelWriter;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.metadata.RelMetadataQuery;
+
+public class BeamPushDownIOSourceRel extends BeamIOSourceRel {
+ private final List<String> usedFields;
+ private final BeamSqlTableFilter tableFilters;
+
+ public BeamPushDownIOSourceRel(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelOptTable table,
+ BeamSqlTable beamTable,
+ List<String> usedFields,
+ BeamSqlTableFilter tableFilters,
+ Map<String, String> pipelineOptions,
+ BeamCalciteTable calciteTable) {
+ super(cluster, traitSet, table, beamTable, pipelineOptions, calciteTable);
+ this.usedFields = usedFields;
+ this.tableFilters = tableFilters;
+ }
+
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ super.explainTerms(pw);
+
+ // This is done to tell Calcite planner that BeamIOSourceRel cannot be simply substituted by
+ // another BeamIOSourceRel, except for when they carry the same content.
+ if (!usedFields.isEmpty()) {
+ pw.item("usedFields", usedFields.toString());
+ }
+ if (!(tableFilters instanceof DefaultTableFilter)) {
+ pw.item(tableFilters.getClass().getSimpleName(), tableFilters.toString());
+ }
+
+ return pw;
+ }
+
+ @Override
+ public PTransform<PCollectionList<Row>, PCollection<Row>> buildPTransform() {
+ return new Transform();
+ }
+
+ private class Transform extends PTransform<PCollectionList<Row>, PCollection<Row>> {
+
+ @Override
+ public PCollection<Row> expand(PCollectionList<Row> input) {
+ checkArgument(
+ input.size() == 0,
+ "Should not have received input for %s: %s",
+ BeamIOSourceRel.class.getSimpleName(),
+ input);
+
+ final PBegin begin = input.getPipeline().begin();
+ final BeamSqlTable beamSqlTable = BeamPushDownIOSourceRel.this.getBeamSqlTable();
+
+ if (usedFields.isEmpty() && tableFilters instanceof DefaultTableFilter) {
+ return beamSqlTable.buildIOReader(begin);
+ }
+
+ final Schema newBeamSchema = CalciteUtils.toSchema(getRowType());
+ return beamSqlTable
+ .buildIOReader(begin, tableFilters, usedFields)
+ .setRowSchema(newBeamSchema);
+ }
+ }
+
+ @Override
+ public BeamCostModel beamComputeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
+ return super.beamComputeSelfCost(planner, mq)
+ .multiplyBy((double) 1 / (getRowType().getFieldCount() + 1));
+ }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java
index e20967e..65f2d3d 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamIOPushDownRule.java
@@ -21,16 +21,19 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamPushDownIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor.FieldDescriptor;
import org.apache.beam.sdk.schemas.Schema;
@@ -74,6 +77,10 @@
final BeamIOSourceRel ioSourceRel = call.rel(1);
final BeamSqlTable beamSqlTable = ioSourceRel.getBeamSqlTable();
+ if (ioSourceRel instanceof BeamPushDownIOSourceRel) {
+ return;
+ }
+
// Nested rows are not supported at the moment
for (RelDataTypeField field : ioSourceRel.getRowType().getFieldList()) {
if (field.getType() instanceof RelRecordType) {
@@ -88,50 +95,62 @@
// When predicate push-down is not supported - all filters are unsupported.
final BeamSqlTableFilter tableFilter = beamSqlTable.constructFilter(projectFilter.right);
- if (!beamSqlTable.supportsProjects() && tableFilter instanceof DefaultTableFilter) {
+ if (!beamSqlTable.supportsProjects().isSupported()
+ && tableFilter instanceof DefaultTableFilter) {
// Either project or filter push-down must be supported by the IO.
return;
}
- if (!(tableFilter instanceof DefaultTableFilter) && !beamSqlTable.supportsProjects()) {
- // TODO(BEAM-8508): add support for standalone filter push-down.
- // Filter push-down without project push-down is not supported for now.
- return;
- }
-
- // Find all input refs used by projects
- boolean hasComplexProjects = false;
Set<String> usedFields = new LinkedHashSet<>();
- for (RexNode project : projectFilter.left) {
- findUtilizedInputRefs(calcInputRowType, project, usedFields);
- if (!hasComplexProjects && project instanceof RexCall) {
- // Ex: 'SELECT field+10 FROM table'
- hasComplexProjects = true;
+ if (!(tableFilter instanceof DefaultTableFilter)
+ && !beamSqlTable.supportsProjects().isSupported()) {
+ // When applying standalone filter push-down all fields must be project by an IO.
+ // With a single exception: Calc projects all fields (in the same order) and does nothing
+ // else.
+ usedFields.addAll(calcInputRowType.getFieldNames());
+ } else {
+ // Find all input refs used by projects
+ for (RexNode project : projectFilter.left) {
+ findUtilizedInputRefs(calcInputRowType, project, usedFields);
+ }
+
+ // Find all input refs used by filters
+ for (RexNode filter : tableFilter.getNotSupported()) {
+ findUtilizedInputRefs(calcInputRowType, filter, usedFields);
}
}
- // Find all input refs used by filters
- for (RexNode filter : tableFilter.getNotSupported()) {
- findUtilizedInputRefs(calcInputRowType, filter, usedFields);
+ if (usedFields.isEmpty()) {
+ // No need to do push-down for queries like this: "select UPPER('hello')".
+ return;
}
- FieldAccessDescriptor resolved =
- FieldAccessDescriptor.withFieldNames(usedFields)
- .withOrderByFieldInsertionOrder()
- .resolve(beamSqlTable.getSchema());
- Schema newSchema =
- SelectHelpers.getOutputSchema(ioSourceRel.getBeamSqlTable().getSchema(), resolved);
- RelDataType calcInputType =
- CalciteUtils.toCalciteRowType(newSchema, ioSourceRel.getCluster().getTypeFactory());
+ // Already most optimal case:
+ // Calc contains all unsupported filters.
+ // IO only projects fields utilized by a calc.
+ if (tableFilter.getNotSupported().containsAll(projectFilter.right)
+ && usedFields.containsAll(ioSourceRel.getRowType().getFieldNames())) {
+ return;
+ }
- // Check if the calc can be dropped:
- // 1. Calc only does projects and renames.
- // And
- // 2. Predicate can be completely pushed-down to IO level.
- if (isProjectRenameOnlyProgram(program) && tableFilter.getNotSupported().isEmpty()) {
+ FieldAccessDescriptor resolved = FieldAccessDescriptor.withFieldNames(usedFields);
+ if (beamSqlTable.supportsProjects().withFieldReordering()) {
+ // Only needs to be done when field reordering is supported, otherwise IO should project
+ // fields in the same order they are defined in the schema and let Calc do the reordering.
+ resolved = resolved.withOrderByFieldInsertionOrder();
+ }
+ resolved = resolved.resolve(beamSqlTable.getSchema());
+
+ if (canDropCalc(program, beamSqlTable.supportsProjects(), tableFilter)) {
// Tell the optimizer to not use old IO, since the new one is better.
call.getPlanner().setImportance(ioSourceRel, 0.0);
- call.transformTo(ioSourceRel.copy(calc.getRowType(), newSchema.getFieldNames(), tableFilter));
+ call.transformTo(
+ ioSourceRel.createPushDownRel(
+ calc.getRowType(),
+ resolved.getFieldsAccessed().stream()
+ .map(FieldDescriptor::getFieldName)
+ .collect(Collectors.toList()),
+ tableFilter));
return;
}
@@ -139,51 +158,25 @@
// Calc contains all unsupported filters.
// IO only projects fields utilised by a calc.
if (tableFilter.getNotSupported().equals(projectFilter.right)
- && usedFields.size() == ioSourceRel.getRowType().getFieldCount()) {
+ && usedFields.containsAll(ioSourceRel.getRowType().getFieldNames())) {
return;
}
- BeamIOSourceRel newIoSourceRel =
- ioSourceRel.copy(calcInputType, newSchema.getFieldNames(), tableFilter);
- RelBuilder relBuilder = call.builder();
- relBuilder.push(newIoSourceRel);
+ RelNode result =
+ constructNodesWithPushDown(
+ resolved,
+ call.builder(),
+ ioSourceRel,
+ tableFilter,
+ calc.getRowType(),
+ projectFilter.left);
- List<RexNode> newProjects = new ArrayList<>();
- List<RexNode> newFilter = new ArrayList<>();
- // Ex: let's say the original fields are (number before each element is the index):
- // {0:unused1, 1:id, 2:name, 3:unused2},
- // where only 'id' and 'name' are being used. Then the new calcInputType should be as follows:
- // {0:id, 1:name}.
- // A mapping list will contain 2 entries: {0:1, 1:2},
- // showing how used field names map to the original fields.
- List<Integer> mapping =
- resolved.getFieldsAccessed().stream()
- .map(FieldDescriptor::getFieldId)
- .collect(Collectors.toList());
-
- // Map filters to new RexInputRef.
- for (RexNode filter : tableFilter.getNotSupported()) {
- newFilter.add(reMapRexNodeToNewInputs(filter, mapping));
- }
- // Map projects to new RexInputRef.
- for (RexNode project : projectFilter.left) {
- newProjects.add(reMapRexNodeToNewInputs(project, mapping));
- }
-
- relBuilder.filter(newFilter);
- relBuilder.project(
- newProjects, calc.getRowType().getFieldNames(), true); // Always preserve named projects.
-
- RelNode result = relBuilder.build();
-
- if (newFilter.size() < projectFilter.right.size()) {
- // Smaller Calc programs are indisputably better.
+ if (tableFilter.getNotSupported().size() <= projectFilter.right.size()
+ || usedFields.size() < calcInputRowType.getFieldCount()) {
+ // Smaller Calc programs are indisputably better, as well as IOs with less projected fields.
+ // We can consider something with the same number of filters.
// Tell the optimizer not to use old Calc and IO.
- call.getPlanner().setImportance(calc, 0.0);
- call.getPlanner().setImportance(ioSourceRel, 0.0);
- call.transformTo(result);
- } else if (newFilter.size() == projectFilter.right.size()) {
- // But we can consider something with the same number of filters.
+ call.getPlanner().setImportance(ioSourceRel, 0);
call.transformTo(result);
}
}
@@ -262,19 +255,126 @@
/**
* Determine whether a program only performs renames and/or projects. RexProgram#isTrivial is not
* sufficient in this case, because number of projects does not need to be the same as inputs.
+ * Calc should NOT be dropped in the following cases:<br>
+ * 1. Projected fields are manipulated (ex: 'select field1+10').<br>
+ * 2. When the same field projected more than once.<br>
+ * 3. When an IO does not supports field reordering and projects fields in a different (from
+ * schema) order.
*
* @param program A program to check.
+ * @param projectReorderingSupported Whether project push-down supports field reordering.
* @return True when program performs only projects (w/o any modifications), false otherwise.
*/
@VisibleForTesting
- boolean isProjectRenameOnlyProgram(RexProgram program) {
+ boolean isProjectRenameOnlyProgram(RexProgram program, boolean projectReorderingSupported) {
int fieldCount = program.getInputRowType().getFieldCount();
+ Set<Integer> projectIndex = new HashSet<>();
+ int previousIndex = -1;
for (RexLocalRef ref : program.getProjectList()) {
- if (ref.getIndex() >= fieldCount) {
+ int index = ref.getIndex();
+ if (index >= fieldCount // Projected values are InputRefs.
+ || !projectIndex.add(ref.getIndex()) // Each field projected once.
+ || (!projectReorderingSupported && index <= previousIndex)) { // In the same order.
return false;
}
+ previousIndex = index;
}
return true;
}
+
+ /**
+ * Perform a series of checks to determine whether a Calc can be dropped. Following conditions
+ * need to be met in order for that to happen (logical AND):<br>
+ * 1. Program should do simple projects, project each field once, and project fields in the same
+ * order when field reordering is not supported.<br>
+ * 2. Predicate can be completely pushed-down.<br>
+ * 3. Project push-down is supported by the IO or all fields are projected by a Calc.
+ *
+ * @param program A {@code RexProgram} of a {@code Calc}.
+ * @param projectSupport An enum containing information about IO project push-down capabilities.
+ * @param tableFilter A class containing information about IO predicate push-down capabilities.
+ * @return True when Calc can be dropped, false otherwise.
+ */
+ private boolean canDropCalc(
+ RexProgram program, ProjectSupport projectSupport, BeamSqlTableFilter tableFilter) {
+ RelDataType calcInputRowType = program.getInputRowType();
+
+ // Program should do simple projects, project each field once, and project fields in the same
+ // order when field reordering is not supported.
+ boolean fieldReorderingSupported = projectSupport.withFieldReordering();
+ if (!isProjectRenameOnlyProgram(program, fieldReorderingSupported)) {
+ return false;
+ }
+ // Predicate can be completely pushed-down
+ if (!tableFilter.getNotSupported().isEmpty()) {
+ return false;
+ }
+ // Project push-down is supported by the IO or all fields are projected by a Calc.
+ boolean isProjectSupported = projectSupport.isSupported();
+ boolean allFieldsProjected =
+ program.getProjectList().stream()
+ .map(ref -> program.getInputRowType().getFieldList().get(ref.getIndex()).getName())
+ .collect(Collectors.toList())
+ .equals(calcInputRowType.getFieldNames());
+ return isProjectSupported || allFieldsProjected;
+ }
+
+ /**
+ * Construct a new {@link BeamIOSourceRel} with predicate and/or project pushed-down and a new
+ * {@code Calc} to do field reordering/field duplication/complex projects.
+ *
+ * @param resolved A descriptor of fields used by a {@code Calc}.
+ * @param relBuilder A {@code RelBuilder} for constructing {@code Project} and {@code Filter} Rel
+ * nodes with operations unsupported by the IO.
+ * @param ioSourceRel Original {@code BeamIOSourceRel} we are attempting to perform push-down for.
+ * @param tableFilter A class containing information about IO predicate push-down capabilities.
+ * @param calcDataType A Calcite output schema of an original {@code Calc}.
+ * @param calcProjects A list of projected {@code RexNode}s by a {@code Calc}.
+ * @return An alternative {@code RelNode} with supported filters/projects pushed-down to IO Rel.
+ */
+ private RelNode constructNodesWithPushDown(
+ FieldAccessDescriptor resolved,
+ RelBuilder relBuilder,
+ BeamIOSourceRel ioSourceRel,
+ BeamSqlTableFilter tableFilter,
+ RelDataType calcDataType,
+ List<RexNode> calcProjects) {
+ Schema newSchema =
+ SelectHelpers.getOutputSchema(ioSourceRel.getBeamSqlTable().getSchema(), resolved);
+ RelDataType calcInputType =
+ CalciteUtils.toCalciteRowType(newSchema, ioSourceRel.getCluster().getTypeFactory());
+
+ BeamIOSourceRel newIoSourceRel =
+ ioSourceRel.createPushDownRel(calcInputType, newSchema.getFieldNames(), tableFilter);
+ relBuilder.push(newIoSourceRel);
+
+ List<RexNode> newProjects = new ArrayList<>();
+ List<RexNode> newFilter = new ArrayList<>();
+ // Ex: let's say the original fields are (number before each element is the index):
+ // {0:unused1, 1:id, 2:name, 3:unused2},
+ // where only 'id' and 'name' are being used. Then the new calcInputType should be as follows:
+ // {0:id, 1:name}.
+ // A mapping list will contain 2 entries: {0:1, 1:2},
+ // showing how used field names map to the original fields.
+ List<Integer> mapping =
+ resolved.getFieldsAccessed().stream()
+ .map(FieldDescriptor::getFieldId)
+ .collect(Collectors.toList());
+
+ // Map filters to new RexInputRef.
+ for (RexNode filter : tableFilter.getNotSupported()) {
+ newFilter.add(reMapRexNodeToNewInputs(filter, mapping));
+ }
+ // Map projects to new RexInputRef.
+ for (RexNode project : calcProjects) {
+ newProjects.add(reMapRexNodeToNewInputs(project, mapping));
+ }
+
+ relBuilder.filter(newFilter);
+ // Force to preserve named projects.
+ relBuilder.project(newProjects, calcDataType.getFieldNames(), true);
+
+ return relBuilder.build();
+ }
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BaseBeamTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BaseBeamTable.java
index fd16ca6..bc276f7 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BaseBeamTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BaseBeamTable.java
@@ -45,7 +45,7 @@
}
@Override
- public boolean supportsProjects() {
- return false;
+ public ProjectSupport supportsProjects() {
+ return ProjectSupport.NONE;
}
}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BeamSqlTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BeamSqlTable.java
index 125bdd0..be2c205 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BeamSqlTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/BeamSqlTable.java
@@ -42,7 +42,7 @@
BeamSqlTableFilter constructFilter(List<RexNode> filter);
/** Whether project push-down is supported by the IO API. */
- boolean supportsProjects();
+ ProjectSupport supportsProjects();
/** Whether this table is bounded (known to be finite) or unbounded (may or may not be finite). */
PCollection.IsBounded isBounded();
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/ProjectSupport.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/ProjectSupport.java
new file mode 100644
index 0000000..5b83d4c
--- /dev/null
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/ProjectSupport.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta;
+
+public enum ProjectSupport {
+ NONE,
+ WITHOUT_FIELD_REORDERING,
+ WITH_FIELD_REORDERING;
+
+ public boolean isSupported() {
+ return !this.equals(NONE);
+ }
+
+ public boolean withFieldReordering() {
+ return this.equals(WITH_FIELD_REORDERING);
+ }
+}
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
index 711f1bf..121eab4 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryTable.java
@@ -27,6 +27,7 @@
import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
@@ -40,7 +41,6 @@
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
-import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -138,12 +138,7 @@
builder.withSelectedFields(fieldNames);
}
- return begin
- .apply("Read Input BQ Rows with push-down", builder)
- .apply(
- "ReorderRowFields",
- Select.fieldAccess(
- FieldAccessDescriptor.withFieldNames(fieldNames).withOrderByFieldInsertionOrder()));
+ return begin.apply("Read Input BQ Rows with push-down", builder);
}
@Override
@@ -156,8 +151,10 @@
}
@Override
- public boolean supportsProjects() {
- return method.equals(Method.DIRECT_READ);
+ public ProjectSupport supportsProjects() {
+ return method.equals(Method.DIRECT_READ)
+ ? ProjectSupport.WITHOUT_FIELD_REORDERING
+ : ProjectSupport.NONE;
}
private TypedRead<Row> getBigQueryReadBuilder(Schema schema) {
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
index 5dae333..fbda05a 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java
@@ -35,6 +35,7 @@
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTableFilter;
import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
+import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
@@ -211,7 +212,7 @@
}
}
- // When project push-down is supported.
+ // When project push-down is supported or field reordering is needed.
if ((options == PushDownOptions.PROJECT || options == PushDownOptions.BOTH)
&& !fieldNames.isEmpty()) {
result =
@@ -240,8 +241,10 @@
}
@Override
- public boolean supportsProjects() {
- return options == PushDownOptions.BOTH || options == PushDownOptions.PROJECT;
+ public ProjectSupport supportsProjects() {
+ return (options == PushDownOptions.BOTH || options == PushDownOptions.PROJECT)
+ ? ProjectSupport.WITH_FIELD_REORDERING
+ : ProjectSupport.NONE;
}
@Override
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
index 907389f..37fbc61 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/IOPushDownRuleTest.java
@@ -139,24 +139,30 @@
@Test
public void testIsProjectRenameOnlyProgram() {
- List<Pair<String, Boolean>> tests =
+ List<Pair<Pair<String, Boolean>, Boolean>> tests =
ImmutableList.of(
- Pair.of("select id from TEST", true),
- Pair.of("select * from TEST", true),
- Pair.of("select id, name from TEST", true),
- Pair.of("select id+10 from TEST", false),
+ // Selecting fields in a different order is only allowed with project push-down.
+ Pair.of(Pair.of("select unused2, name, id from TEST", true), true),
+ Pair.of(Pair.of("select unused2, name, id from TEST", false), false),
+ Pair.of(Pair.of("select id from TEST", false), true),
+ Pair.of(Pair.of("select * from TEST", false), true),
+ Pair.of(Pair.of("select id, name from TEST", false), true),
+ Pair.of(Pair.of("select id+10 from TEST", false), false),
// Note that we only care about projects.
- Pair.of("select id from TEST where name='one'", true));
+ Pair.of(Pair.of("select id from TEST where name='one'", false), true));
- for (Pair<String, Boolean> test : tests) {
- String sqlQuery = test.left;
+ for (Pair<Pair<String, Boolean>, Boolean> test : tests) {
+ String sqlQuery = test.left.left;
+ boolean projectPushDownSupported = test.left.right;
boolean expectedAnswer = test.right;
BeamRelNode basicRel = sqlEnv.parseQuery(sqlQuery);
assertThat(basicRel, instanceOf(Calc.class));
Calc calc = (Calc) basicRel;
assertThat(
- BeamIOPushDownRule.INSTANCE.isProjectRenameOnlyProgram(calc.getProgram()),
+ test.toString(),
+ BeamIOPushDownRule.INSTANCE.isProjectRenameOnlyProgram(
+ calc.getProgram(), projectPushDownSupported),
equalTo(expectedAnswer));
}
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
index 04b8fb0..9a14cab 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BigQueryReadWriteIT.java
@@ -40,6 +40,7 @@
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -481,7 +482,14 @@
BeamRelNode relNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> output = BeamSqlRelUtils.toPCollection(readPipeline, relNode);
- assertThat(relNode, instanceOf(BeamIOSourceRel.class));
+ // Calc is not dropped because BigQuery does not support field reordering yet.
+ assertThat(relNode, instanceOf(BeamCalcRel.class));
+ assertThat(relNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ // IO projects fields in the same order they are defined in the schema.
+ assertThat(
+ relNode.getInput(0).getRowType().getFieldNames(),
+ containsInAnyOrder("c_tinyint", "c_integer", "c_varchar"));
+ // Field reordering is done in a Calc
assertThat(
output.getSchema(),
equalTo(
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java
new file mode 100644
index 0000000..1acb94f
--- /dev/null
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterAndProjectPushDown.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.test;
+
+import static org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PUSH_DOWN_OPTION;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.alibaba.fastjson.JSON;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCalcRule;
+import org.apache.beam.sdk.extensions.sql.impl.rule.BeamIOPushDownRule;
+import org.apache.beam.sdk.extensions.sql.meta.Table;
+import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PushDownOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.CalcMergeRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterCalcMergeRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterToCalcRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectCalcMergeRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestTableProviderWithFilterAndProjectPushDown {
+ private static final Schema BASIC_SCHEMA =
+ Schema.builder()
+ .addInt32Field("unused1")
+ .addInt32Field("id")
+ .addStringField("name")
+ .addInt16Field("unused2")
+ .addBooleanField("b")
+ .build();
+ private static final List<RelOptRule> rulesWithPushDown =
+ ImmutableList.of(
+ BeamCalcRule.INSTANCE,
+ FilterCalcMergeRule.INSTANCE,
+ ProjectCalcMergeRule.INSTANCE,
+ BeamIOPushDownRule.INSTANCE,
+ FilterToCalcRule.INSTANCE,
+ ProjectToCalcRule.INSTANCE,
+ CalcMergeRule.INSTANCE);
+ private BeamSqlEnv sqlEnv;
+
+ @Rule public TestPipeline pipeline = TestPipeline.create();
+
+ @Before
+ public void buildUp() {
+ TestTableProvider tableProvider = new TestTableProvider();
+ Table table = getTable("TEST", PushDownOptions.BOTH);
+ tableProvider.createTable(table);
+ tableProvider.addRows(
+ table.getName(),
+ row(BASIC_SCHEMA, 100, 1, "one", (short) 100, true),
+ row(BASIC_SCHEMA, 200, 2, "two", (short) 200, false));
+
+ sqlEnv =
+ BeamSqlEnv.builder(tableProvider)
+ .setPipelineOptions(PipelineOptionsFactory.create())
+ .setRuleSets(new RuleSet[] {RuleSets.ofList(rulesWithPushDown)})
+ .build();
+ }
+
+ @Test
+ public void testIOSourceRel_predicateSimple() {
+ String selectTableStatement = "SELECT name FROM TEST where id=2";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+ assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_predicateSimple_Boolean() {
+ String selectTableStatement = "SELECT name FROM TEST where b";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+ assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_predicateWithAnd() {
+ String selectTableStatement = "SELECT name FROM TEST where id>=2 and unused1<=200";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+ assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_withComplexProjects_withSupportedFilter() {
+ String selectTableStatement =
+ "SELECT name as new_name, unused1+10-id as new_id FROM TEST where 1<id";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ // Make sure project push-down was done
+ List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(a, containsInAnyOrder("name", "unused1", "id"));
+ assertEquals(
+ Schema.builder().addStringField("new_name").addInt32Field("new_id").build(),
+ result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 208));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_selectFieldsInRandomOrder_withRename_withSupportedFilter() {
+ String selectTableStatement =
+ "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where 1<id";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+ // Make sure project push-down was done
+ List<String> a = beamRelNode.getRowType().getFieldNames();
+ assertThat(a, containsInAnyOrder("new_name", "new_id", "new_unused1"));
+ assertEquals(
+ Schema.builder()
+ .addStringField("new_name")
+ .addInt32Field("new_id")
+ .addInt32Field("new_unused1")
+ .build(),
+ result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_selectFieldsInRandomOrder_withRename_withUnsupportedFilter() {
+ String selectTableStatement =
+ "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where id+unused1=202";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ // Make sure project push-down was done
+ List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+ assertEquals(
+ Schema.builder()
+ .addStringField("new_name")
+ .addInt32Field("new_id")
+ .addInt32Field("new_unused1")
+ .build(),
+ result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void
+ testIOSourceRel_selectFieldsInRandomOrder_withRename_withSupportedAndUnsupportedFilters() {
+ String selectTableStatement =
+ "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where 1<id and id+unused1=202";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ // Make sure project push-down was done
+ List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+ assertEquals(
+ "BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{<(1, $1)}, unsupported{=(+($1, $0), 202)}])",
+ beamRelNode.getInput(0).getDigest());
+ assertEquals(
+ Schema.builder()
+ .addStringField("new_name")
+ .addInt32Field("new_id")
+ .addInt32Field("new_unused1")
+ .build(),
+ result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_selectAllField() {
+ String selectTableStatement = "SELECT * FROM TEST where id<>2";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+ assertEquals(
+ "BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[unused1, id, name, unused2, b],TestTableFilter=[supported{<>($1, 2)}, unsupported{}])",
+ beamRelNode.getDigest());
+ assertEquals(BASIC_SCHEMA, result.getSchema());
+ PAssert.that(result)
+ .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ private static Row row(Schema schema, Object... objects) {
+ return Row.withSchema(schema).addValues(objects).build();
+ }
+
+ @Test
+ public void testIOSourceRel_withUnsupportedPredicate() {
+ String selectTableStatement = "SELECT name FROM TEST where id+unused1=101";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ assertEquals(
+ "BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{}, unsupported{=(+($1, $0), 101)}])",
+ beamRelNode.getInput(0).getDigest());
+ // Make sure project push-down was done
+ List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+
+ assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_selectAll_withUnsupportedPredicate() {
+ String selectTableStatement = "SELECT * FROM TEST where id+unused1=101";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ assertEquals(
+ "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST])", beamRelNode.getInput(0).getDigest());
+ // Make sure project push-down was done (all fields since 'select *')
+ List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(a, containsInAnyOrder("name", "id", "unused1", "unused2", "b"));
+
+ assertEquals(BASIC_SCHEMA, result.getSchema());
+ PAssert.that(result)
+ .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_withSupportedAndUnsupportedPredicate() {
+ String selectTableStatement = "SELECT name FROM TEST where id+unused1=101 and id=1";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ assertEquals(
+ "BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{=($1, 1)}, unsupported{=(+($1, $0), 101)}])",
+ beamRelNode.getInput(0).getDigest());
+ // Make sure project push-down was done
+ List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+
+ assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_selectAll_withSupportedAndUnsupportedPredicate() {
+ String selectTableStatement = "SELECT * FROM TEST where id+unused1=101 and id=1";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ assertEquals(
+ "BeamPushDownIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[unused1, id, name, unused2, b],TestTableFilter=[supported{=($1, 1)}, unsupported{=(+($1, $0), 101)}])",
+ beamRelNode.getInput(0).getDigest());
+ // Make sure project push-down was done (all fields since 'select *')
+ List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(a, containsInAnyOrder("unused1", "name", "id", "unused2", "b"));
+
+ assertEquals(BASIC_SCHEMA, result.getSchema());
+ PAssert.that(result)
+ .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_selectOneFieldsMoreThanOnce() {
+ String selectTableStatement = "SELECT b, b, b, b, b FROM TEST";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ // Calc must not be dropped
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ // Make sure project push-down was done
+ List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(pushedFields, containsInAnyOrder("b"));
+
+ assertEquals(
+ Schema.builder()
+ .addBooleanField("b")
+ .addBooleanField("b0")
+ .addBooleanField("b1")
+ .addBooleanField("b2")
+ .addBooleanField("b3")
+ .build(),
+ result.getSchema());
+ PAssert.that(result)
+ .containsInAnyOrder(
+ row(result.getSchema(), true, true, true, true, true),
+ row(result.getSchema(), false, false, false, false, false));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_selectOneFieldsMoreThanOnce_withSupportedPredicate() {
+ String selectTableStatement = "SELECT b, b, b, b, b FROM TEST where b";
+
+ // Calc must not be dropped
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ // Supported predicate should be pushed-down
+ assertNull(((BeamCalcRel) beamRelNode).getProgram().getCondition());
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ // Make sure project push-down was done
+ List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(pushedFields, containsInAnyOrder("b"));
+
+ assertEquals(
+ Schema.builder()
+ .addBooleanField("b")
+ .addBooleanField("b0")
+ .addBooleanField("b1")
+ .addBooleanField("b2")
+ .addBooleanField("b3")
+ .build(),
+ result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), true, true, true, true, true));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ private static Table getTable(String name, PushDownOptions options) {
+ return Table.builder()
+ .name(name)
+ .comment(name + " table")
+ .schema(BASIC_SCHEMA)
+ .properties(
+ JSON.parseObject("{ " + PUSH_DOWN_OPTION + ": " + "\"" + options.toString() + "\" }"))
+ .type("test")
+ .build();
+ }
+}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
index 0b6ead6..e64a103 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithFilterPushDown.java
@@ -19,9 +19,11 @@
import static org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.PUSH_DOWN_OPTION;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import com.alibaba.fastjson.JSON;
import java.util.List;
@@ -42,6 +44,7 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.plan.RelOptRule;
+import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.core.Calc;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.CalcMergeRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterCalcMergeRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.FilterToCalcRule;
@@ -49,6 +52,7 @@
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.rules.ProjectToCalcRule;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.RuleSets;
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Rule;
@@ -82,7 +86,7 @@
@Before
public void buildUp() {
TestTableProvider tableProvider = new TestTableProvider();
- Table table = getTable("TEST", PushDownOptions.BOTH);
+ Table table = getTable("TEST", PushDownOptions.FILTER);
tableProvider.createTable(table);
tableProvider.addRows(
table.getName(),
@@ -97,13 +101,21 @@
}
@Test
- public void testIOSourceRel_predicateSimple() {
- String selectTableStatement = "SELECT name FROM TEST where id=2";
+ public void testIOSourceRel_withFilter_shouldProjectAllFields() {
+ String selectTableStatement = "SELECT name FROM TEST where name='two'";
BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
- assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ // Condition should be pushed-down to IO level
+ assertNull(((Calc) beamRelNode).getProgram().getCondition());
+
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ List<String> projects = beamRelNode.getInput(0).getRowType().getFieldNames();
+ // When performing standalone filter push-down IO should project all fields.
+ assertThat(projects, containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
+
assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two"));
@@ -111,169 +123,69 @@
}
@Test
- public void testIOSourceRel_predicateSimple_Boolean() {
- String selectTableStatement = "SELECT name FROM TEST where b";
+ public void testIOSourceRel_selectAll_withSupportedFilter_shouldDropCalc() {
+ String selectTableStatement = "SELECT * FROM TEST where name='two'";
BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+ // Calc is dropped, because all fields are projected in the same order and filter is
+ // pushed-down.
assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
- assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
- PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
- pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
- }
+ List<String> projects = beamRelNode.getRowType().getFieldNames();
+ assertThat(projects, containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
- @Test
- public void testIOSourceRel_predicateWithAnd() {
- String selectTableStatement = "SELECT name FROM TEST where id>=2 and unused1<=200";
-
- BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
- PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
- assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
- assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
- PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two"));
-
- pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
- }
-
- @Test
- public void testIOSourceRel_withComplexProjects_withSupportedFilter() {
- String selectTableStatement =
- "SELECT name as new_name, unused1+10-id as new_id FROM TEST where 1<id";
-
- BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
- PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
- assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
- assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
- // Make sure project push-down was done
- List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
- assertThat(a, containsInAnyOrder("name", "unused1", "id"));
- assertEquals(
- Schema.builder().addStringField("new_name").addInt32Field("new_id").build(),
- result.getSchema());
- PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 208));
-
- pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
- }
-
- @Test
- public void testIOSourceRel_selectFieldsInRandomOrder_withRename_withSupportedFilter() {
- String selectTableStatement =
- "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where 1<id";
-
- BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
- PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
- assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
- // Make sure project push-down was done
- List<String> a = beamRelNode.getRowType().getFieldNames();
- assertThat(a, containsInAnyOrder("new_name", "new_id", "new_unused1"));
- assertEquals(
- Schema.builder()
- .addStringField("new_name")
- .addInt32Field("new_id")
- .addInt32Field("new_unused1")
- .build(),
- result.getSchema());
- PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
-
- pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
- }
-
- @Test
- public void testIOSourceRel_selectFieldsInRandomOrder_withRename_withUnsupportedFilter() {
- String selectTableStatement =
- "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where id+unused1=202";
-
- BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
- PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
- assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
- assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
- // Make sure project push-down was done
- List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
- assertThat(a, containsInAnyOrder("name", "id", "unused1"));
- assertEquals(
- Schema.builder()
- .addStringField("new_name")
- .addInt32Field("new_id")
- .addInt32Field("new_unused1")
- .build(),
- result.getSchema());
- PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
-
- pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
- }
-
- @Test
- public void
- testIOSourceRel_selectFieldsInRandomOrder_withRename_withSupportedAndUnsupportedFilters() {
- String selectTableStatement =
- "SELECT name as new_name, id as new_id, unused1 as new_unused1 FROM TEST where 1<id and id+unused1=202";
-
- BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
- PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
- assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
- assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
- // Make sure project push-down was done
- List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
- assertThat(a, containsInAnyOrder("name", "id", "unused1"));
- assertEquals(
- "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{<(1, $1)}, unsupported{=(+($1, $0), 202)}])",
- beamRelNode.getInput(0).getDigest());
- assertEquals(
- Schema.builder()
- .addStringField("new_name")
- .addInt32Field("new_id")
- .addInt32Field("new_unused1")
- .build(),
- result.getSchema());
- PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "two", 2, 200));
-
- pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
- }
-
- @Test
- public void testIOSourceRel_selectAllField() {
- String selectTableStatement = "SELECT * FROM TEST where id<>2";
-
- BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
- PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
-
- assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
- assertEquals(
- "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[unused1, id, name, unused2, b],TestTableFilter=[supported{<>($1, 2)}, unsupported{}])",
- beamRelNode.getDigest());
assertEquals(BASIC_SCHEMA, result.getSchema());
PAssert.that(result)
- .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+ .containsInAnyOrder(row(result.getSchema(), 200, 2, "two", (short) 200, false));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
- private static Row row(Schema schema, Object... objects) {
- return Row.withSchema(schema).addValues(objects).build();
- }
-
@Test
- public void testIOSourceRel_withUnsupportedPredicate() {
- String selectTableStatement = "SELECT name FROM TEST where id+unused1=101";
+ public void testIOSourceRel_withSupportedFilter_selectInRandomOrder() {
+ String selectTableStatement = "SELECT unused2, id, name FROM TEST where b";
BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ // Condition should be pushed-down to IO level
+ assertNull(((Calc) beamRelNode).getProgram().getCondition());
+
assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ List<String> projects = beamRelNode.getInput(0).getRowType().getFieldNames();
+ // When performing standalone filter push-down IO should project all fields.
+ assertThat(projects, containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
+
assertEquals(
- "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{}, unsupported{=(+($1, $0), 101)}])",
- beamRelNode.getInput(0).getDigest());
- // Make sure project push-down was done
- List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
- assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+ Schema.builder()
+ .addInt16Field("unused2")
+ .addInt32Field("id")
+ .addStringField("name")
+ .build(),
+ result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), (short) 100, 1, "one"));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_withUnsupportedFilter_calcPreservesCondition() {
+ String selectTableStatement = "SELECT name FROM TEST where id+1=2";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ // Unsupported condition should be preserved in a Calc
+ assertNotNull(((Calc) beamRelNode).getProgram().getCondition());
+
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ List<String> projects = beamRelNode.getInput(0).getRowType().getFieldNames();
+ // When performing standalone filter push-down IO should project all fields.
+ assertThat(projects, containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
@@ -282,69 +194,99 @@
}
@Test
- public void testIOSourceRel_selectAll_withUnsupportedPredicate() {
- String selectTableStatement = "SELECT * FROM TEST where id+unused1=101";
+ public void testIOSourceRel_selectAllFieldsInRandomOrder_shouldPushDownSupportedFilter() {
+ String selectTableStatement = "SELECT unused2, name, id, b, unused1 FROM TEST where name='two'";
BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+ // Calc should not be dropped, because fields are selected in a different order, even though
+ // all filters are supported and all fields are projected.
assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
- assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
- assertEquals(
- "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],TestTableFilter=[supported{}, unsupported{}])",
- beamRelNode.getInput(0).getDigest());
- // Make sure project push-down was done (all fields since 'select *')
- List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
- assertThat(a, containsInAnyOrder("name", "id", "unused1", "unused2", "b"));
+ assertNull(((BeamCalcRel) beamRelNode).getProgram().getCondition());
- assertEquals(BASIC_SCHEMA, result.getSchema());
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ List<String> projects = beamRelNode.getInput(0).getRowType().getFieldNames();
+ // When performing standalone filter push-down IO should project all fields.
+ assertThat(projects, containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
+
+ assertEquals(
+ Schema.builder()
+ .addInt16Field("unused2")
+ .addStringField("name")
+ .addInt32Field("id")
+ .addBooleanField("b")
+ .addInt32Field("unused1")
+ .build(),
+ result.getSchema());
PAssert.that(result)
- .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+ .containsInAnyOrder(row(result.getSchema(), (short) 200, "two", 2, false, 200));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
- public void testIOSourceRel_withSupportedAndUnsupportedPredicate() {
- String selectTableStatement = "SELECT name FROM TEST where id+unused1=101 and id=1";
+ public void testIOSourceRel_selectOneFieldsMoreThanOnce() {
+ String selectTableStatement = "SELECT b, b, b, b, b FROM TEST";
BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+ // Calc must not be dropped
assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
- assertEquals(
- "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[name, id, unused1],TestTableFilter=[supported{=($1, 1)}, unsupported{=(+($1, $0), 101)}])",
- beamRelNode.getInput(0).getDigest());
// Make sure project push-down was done
- List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
- assertThat(a, containsInAnyOrder("name", "id", "unused1"));
+ List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+ // When performing standalone filter push-down IO should project all fields.
+ assertThat(
+ pushedFields,
+ IsIterableContainingInAnyOrder.containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
- assertEquals(Schema.builder().addStringField("name").build(), result.getSchema());
- PAssert.that(result).containsInAnyOrder(row(result.getSchema(), "one"));
+ assertEquals(
+ Schema.builder()
+ .addBooleanField("b")
+ .addBooleanField("b0")
+ .addBooleanField("b1")
+ .addBooleanField("b2")
+ .addBooleanField("b3")
+ .build(),
+ result.getSchema());
+ PAssert.that(result)
+ .containsInAnyOrder(
+ row(result.getSchema(), true, true, true, true, true),
+ row(result.getSchema(), false, false, false, false, false));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@Test
- public void testIOSourceRel_selectAll_withSupportedAndUnsupportedPredicate() {
- String selectTableStatement = "SELECT * FROM TEST where id+unused1=101 and id=1";
+ public void testIOSourceRel_selectOneFieldsMoreThanOnce_withSupportedPredicate() {
+ String selectTableStatement = "SELECT b, b, b, b, b FROM TEST where b";
+ // Calc must not be dropped
BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ // Supported predicate should be pushed-down
+ assertNull(((BeamCalcRel) beamRelNode).getProgram().getCondition());
assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
- assertEquals(
- "BeamIOSourceRel.BEAM_LOGICAL(table=[beam, TEST],usedFields=[unused1, id, name, unused2, b],TestTableFilter=[supported{=($1, 1)}, unsupported{=(+($1, $0), 101)}])",
- beamRelNode.getInput(0).getDigest());
- // Make sure project push-down was done (all fields since 'select *')
- List<String> a = beamRelNode.getInput(0).getRowType().getFieldNames();
- assertThat(a, containsInAnyOrder("unused1", "name", "id", "unused2", "b"));
+ // Make sure project push-down was done
+ List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(
+ pushedFields,
+ IsIterableContainingInAnyOrder.containsInAnyOrder("unused1", "id", "name", "unused2", "b"));
- assertEquals(BASIC_SCHEMA, result.getSchema());
- PAssert.that(result)
- .containsInAnyOrder(row(result.getSchema(), 100, 1, "one", (short) 100, true));
+ assertEquals(
+ Schema.builder()
+ .addBooleanField("b")
+ .addBooleanField("b0")
+ .addBooleanField("b1")
+ .addBooleanField("b2")
+ .addBooleanField("b3")
+ .build(),
+ result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), true, true, true, true, true));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
@@ -359,4 +301,8 @@
.type("test")
.build();
}
+
+ private static Row row(Schema schema, Object... objects) {
+ return Row.withSchema(schema).addValues(objects).build();
+ }
}
diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
index d8b6141..363c0f2 100644
--- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
+++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProviderWithProjectPushDown.java
@@ -22,10 +22,12 @@
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import com.alibaba.fastjson.JSON;
import java.util.List;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamIOSourceRel;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
@@ -180,15 +182,76 @@
.containsInAnyOrder(
row(result.getSchema(), 100, 1, "one", 100),
row(result.getSchema(), 200, 2, "two", 200));
- assertThat(beamRelNode, instanceOf(BeamIOSourceRel.class));
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
// If project push-down succeeds new BeamIOSourceRel should not output unused fields
assertThat(
- beamRelNode.getRowType().getFieldNames(),
+ beamRelNode.getInput(0).getRowType().getFieldNames(),
containsInAnyOrder("unused1", "id", "name", "unused2"));
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}
+ @Test
+ public void testIOSourceRel_selectOneFieldsMoreThanOnce() {
+ String selectTableStatement = "SELECT id, id, id, id, id FROM TEST";
+
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ // Calc must not be dropped
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ // Make sure project push-down was done
+ List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(pushedFields, containsInAnyOrder("id"));
+
+ assertEquals(
+ Schema.builder()
+ .addInt32Field("id")
+ .addInt32Field("id0")
+ .addInt32Field("id1")
+ .addInt32Field("id2")
+ .addInt32Field("id3")
+ .build(),
+ result.getSchema());
+ PAssert.that(result)
+ .containsInAnyOrder(
+ row(result.getSchema(), 1, 1, 1, 1, 1), row(result.getSchema(), 2, 2, 2, 2, 2));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
+ @Test
+ public void testIOSourceRel_selectOneFieldsMoreThanOnce_withSupportedPredicate() {
+ String selectTableStatement = "SELECT id, id, id, id, id FROM TEST where id=1";
+
+ // Calc must not be dropped
+ BeamRelNode beamRelNode = sqlEnv.parseQuery(selectTableStatement);
+ PCollection<Row> result = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+
+ assertThat(beamRelNode, instanceOf(BeamCalcRel.class));
+ // Project push-down should leave predicate in a Calc
+ assertNotNull(((BeamCalcRel) beamRelNode).getProgram().getCondition());
+ assertThat(beamRelNode.getInput(0), instanceOf(BeamIOSourceRel.class));
+ // Make sure project push-down was done
+ List<String> pushedFields = beamRelNode.getInput(0).getRowType().getFieldNames();
+ assertThat(pushedFields, containsInAnyOrder("id"));
+
+ assertEquals(
+ Schema.builder()
+ .addInt32Field("id")
+ .addInt32Field("id0")
+ .addInt32Field("id1")
+ .addInt32Field("id2")
+ .addInt32Field("id3")
+ .build(),
+ result.getSchema());
+ PAssert.that(result).containsInAnyOrder(row(result.getSchema(), 1, 1, 1, 1, 1));
+
+ pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
+ }
+
private static Row row(Schema schema, Object... objects) {
return Row.withSchema(schema).addValues(objects).build();
}
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
index 9ea3ff5..668fa3c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisIO.java
@@ -239,6 +239,7 @@
.setMaxNumRecords(Long.MAX_VALUE)
.setUpToDateThreshold(Duration.ZERO)
.setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy())
+ .setMaxCapacityPerShard(ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD)
.build();
}
@@ -272,6 +273,8 @@
abstract WatermarkPolicyFactory getWatermarkPolicyFactory();
+ abstract Integer getMaxCapacityPerShard();
+
abstract Builder toBuilder();
@AutoValue.Builder
@@ -293,6 +296,8 @@
abstract Builder setWatermarkPolicyFactory(WatermarkPolicyFactory watermarkPolicyFactory);
+ abstract Builder setMaxCapacityPerShard(Integer maxCapacity);
+
abstract Read build();
}
@@ -420,6 +425,12 @@
return toBuilder().setWatermarkPolicyFactory(watermarkPolicyFactory).build();
}
+ /** Specifies the maximum number of messages per one shard. */
+ public Read withMaxCapacityPerShard(Integer maxCapacity) {
+ checkArgument(maxCapacity > 0, "maxCapacity must be positive, but was: %s", maxCapacity);
+ return toBuilder().setMaxCapacityPerShard(maxCapacity).build();
+ }
+
@Override
public PCollection<KinesisRecord> expand(PBegin input) {
Unbounded<KinesisRecord> unbounded =
@@ -430,7 +441,8 @@
getInitialPosition(),
getUpToDateThreshold(),
getWatermarkPolicyFactory(),
- getRequestRecordsLimit()));
+ getRequestRecordsLimit(),
+ getMaxCapacityPerShard()));
PTransform<PBegin, PCollection<KinesisRecord>> transform = unbounded;
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
index db73b99..9e869f5 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisReader.java
@@ -46,20 +46,23 @@
private long lastBacklogBytes;
private Instant backlogBytesLastCheckTime = new Instant(0L);
private ShardReadersPool shardReadersPool;
+ private final Integer maxCapacityPerShard;
KinesisReader(
SimplifiedKinesisClient kinesis,
CheckpointGenerator initialCheckpointGenerator,
KinesisSource source,
WatermarkPolicyFactory watermarkPolicyFactory,
- Duration upToDateThreshold) {
+ Duration upToDateThreshold,
+ Integer maxCapacityPerShard) {
this(
kinesis,
initialCheckpointGenerator,
source,
watermarkPolicyFactory,
upToDateThreshold,
- Duration.standardSeconds(30));
+ Duration.standardSeconds(30),
+ maxCapacityPerShard);
}
KinesisReader(
@@ -68,7 +71,8 @@
KinesisSource source,
WatermarkPolicyFactory watermarkPolicyFactory,
Duration upToDateThreshold,
- Duration backlogBytesCheckThreshold) {
+ Duration backlogBytesCheckThreshold,
+ Integer maxCapacityPerShard) {
this.kinesis = checkNotNull(kinesis, "kinesis");
this.initialCheckpointGenerator =
checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
@@ -76,6 +80,7 @@
this.source = source;
this.upToDateThreshold = upToDateThreshold;
this.backlogBytesCheckThreshold = backlogBytesCheckThreshold;
+ this.maxCapacityPerShard = maxCapacityPerShard;
}
/** Generates initial checkpoint and instantiates iterators for shards. */
@@ -177,6 +182,9 @@
ShardReadersPool createShardReadersPool() throws TransientKinesisException {
return new ShardReadersPool(
- kinesis, initialCheckpointGenerator.generate(kinesis), watermarkPolicyFactory);
+ kinesis,
+ initialCheckpointGenerator.generate(kinesis),
+ watermarkPolicyFactory,
+ maxCapacityPerShard);
}
}
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
index 7785cb7..a9d05f3 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisSource.java
@@ -40,6 +40,7 @@
private final WatermarkPolicyFactory watermarkPolicyFactory;
private CheckpointGenerator initialCheckpointGenerator;
private final Integer limit;
+ private final Integer maxCapacityPerShard;
KinesisSource(
AWSClientsProvider awsClientsProvider,
@@ -47,14 +48,16 @@
StartingPoint startingPoint,
Duration upToDateThreshold,
WatermarkPolicyFactory watermarkPolicyFactory,
- Integer limit) {
+ Integer limit,
+ Integer maxCapacityPerShard) {
this(
awsClientsProvider,
new DynamicCheckpointGenerator(streamName, startingPoint),
streamName,
upToDateThreshold,
watermarkPolicyFactory,
- limit);
+ limit,
+ maxCapacityPerShard);
}
private KinesisSource(
@@ -63,13 +66,15 @@
String streamName,
Duration upToDateThreshold,
WatermarkPolicyFactory watermarkPolicyFactory,
- Integer limit) {
+ Integer limit,
+ Integer maxCapacityPerShard) {
this.awsClientsProvider = awsClientsProvider;
this.initialCheckpointGenerator = initialCheckpoint;
this.streamName = streamName;
this.upToDateThreshold = upToDateThreshold;
this.watermarkPolicyFactory = watermarkPolicyFactory;
this.limit = limit;
+ this.maxCapacityPerShard = maxCapacityPerShard;
validate();
}
@@ -93,7 +98,8 @@
streamName,
upToDateThreshold,
watermarkPolicyFactory,
- limit));
+ limit,
+ maxCapacityPerShard));
}
return sources;
}
@@ -120,7 +126,8 @@
checkpointGenerator,
this,
watermarkPolicyFactory,
- upToDateThreshold);
+ upToDateThreshold,
+ maxCapacityPerShard);
}
@Override
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
index 71a12fc..195101c 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/ShardReadersPool.java
@@ -33,6 +33,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Instant;
@@ -46,7 +47,7 @@
class ShardReadersPool {
private static final Logger LOG = LoggerFactory.getLogger(ShardReadersPool.class);
- private static final int DEFAULT_CAPACITY_PER_SHARD = 10_000;
+ public static final int DEFAULT_CAPACITY_PER_SHARD = 10_000;
private static final int ATTEMPTS_TO_SHUTDOWN = 3;
/**
@@ -81,13 +82,6 @@
ShardReadersPool(
SimplifiedKinesisClient kinesis,
KinesisReaderCheckpoint initialCheckpoint,
- WatermarkPolicyFactory watermarkPolicyFactory) {
- this(kinesis, initialCheckpoint, watermarkPolicyFactory, DEFAULT_CAPACITY_PER_SHARD);
- }
-
- ShardReadersPool(
- SimplifiedKinesisClient kinesis,
- KinesisReaderCheckpoint initialCheckpoint,
WatermarkPolicyFactory watermarkPolicyFactory,
int queueCapacityPerShard) {
this.kinesis = kinesis;
@@ -309,4 +303,9 @@
}
return shardsMap.build();
}
+
+ @VisibleForTesting
+ BlockingQueue<KinesisRecord> getRecordsQueue() {
+ return recordsQueue;
+ }
}
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
index 37528ef..060af47 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/KinesisReaderTest.java
@@ -70,7 +70,8 @@
kinesisSource,
WatermarkPolicyFactory.withArrivalTimePolicy(),
Duration.ZERO,
- backlogBytesCheckThreshold) {
+ backlogBytesCheckThreshold,
+ ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD) {
@Override
ShardReadersPool createShardReadersPool() {
return shardReadersPool;
diff --git a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
index 125ff8c..0d9e9a3 100644
--- a/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
+++ b/sdks/java/io/kinesis/src/test/java/org/apache/beam/sdk/io/kinesis/ShardReadersPoolTest.java
@@ -78,7 +78,7 @@
WatermarkPolicy policy = WatermarkPolicyFactory.withArrivalTimePolicy().createWatermarkPolicy();
checkpoint = new KinesisReaderCheckpoint(ImmutableList.of(firstCheckpoint, secondCheckpoint));
- shardReadersPool = Mockito.spy(new ShardReadersPool(kinesis, checkpoint, factory));
+ shardReadersPool = Mockito.spy(new ShardReadersPool(kinesis, checkpoint, factory, 100));
when(factory.createWatermarkPolicy()).thenReturn(policy);
@@ -112,6 +112,7 @@
}
}
assertThat(fetchedRecords).containsExactlyInAnyOrder(a, b, c, d);
+ assertThat(shardReadersPool.getRecordsQueue().remainingCapacity()).isEqualTo(100 * 2);
}
@Test
@@ -237,7 +238,12 @@
KinesisReaderCheckpoint checkpoint = new KinesisReaderCheckpoint(Collections.emptyList());
WatermarkPolicyFactory watermarkPolicyFactory = WatermarkPolicyFactory.withArrivalTimePolicy();
shardReadersPool =
- Mockito.spy(new ShardReadersPool(kinesis, checkpoint, watermarkPolicyFactory));
+ Mockito.spy(
+ new ShardReadersPool(
+ kinesis,
+ checkpoint,
+ watermarkPolicyFactory,
+ ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD));
doReturn(firstIterator)
.when(shardReadersPool)
.createShardIterator(eq(kinesis), any(ShardCheckpoint.class));
diff --git a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
index 3d28981..486bbe6 100644
--- a/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
+++ b/sdks/java/io/rabbitmq/src/main/java/org/apache/beam/sdk/io/rabbitmq/RabbitMqIO.java
@@ -32,6 +32,7 @@
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeoutException;
@@ -415,16 +416,28 @@
private static class RabbitMQCheckpointMark
implements UnboundedSource.CheckpointMark, Serializable {
transient Channel channel;
- Instant oldestTimestamp = Instant.now();
+ Instant latestTimestamp = Instant.now();
final List<Long> sessionIds = new ArrayList<>();
+ /**
+ * Advances the watermark to the provided time, provided said time is after the current
+ * watermark. If the provided time is before the latest, this function no-ops.
+ *
+ * @param time The time to advance the watermark to
+ */
+ public void advanceWatermark(Instant time) {
+ if (time.isAfter(latestTimestamp)) {
+ latestTimestamp = time;
+ }
+ }
+
@Override
public void finalizeCheckpoint() throws IOException {
for (Long sessionId : sessionIds) {
channel.basicAck(sessionId, false);
}
channel.txCommit();
- oldestTimestamp = Instant.now();
+ latestTimestamp = Instant.now();
sessionIds.clear();
}
}
@@ -449,7 +462,7 @@
@Override
public Instant getWatermark() {
- return checkpointMark.oldestTimestamp;
+ return checkpointMark.latestTimestamp;
}
@Override
@@ -530,6 +543,10 @@
// we consume message without autoAck (we want to do the ack ourselves)
GetResponse delivery = channel.basicGet(queueName, false);
if (delivery == null) {
+ current = null;
+ currentRecordId = null;
+ currentTimestamp = null;
+ checkpointMark.advanceWatermark(Instant.now());
return false;
}
if (source.spec.useCorrelationId()) {
@@ -545,10 +562,10 @@
checkpointMark.sessionIds.add(deliveryTag);
current = new RabbitMqMessage(source.spec.routingKey(), delivery);
- currentTimestamp = new Instant(delivery.getProps().getTimestamp());
- if (currentTimestamp.isBefore(checkpointMark.oldestTimestamp)) {
- checkpointMark.oldestTimestamp = currentTimestamp;
- }
+ Date deliveryTimestamp = delivery.getProps().getTimestamp();
+ currentTimestamp =
+ (deliveryTimestamp != null) ? new Instant(deliveryTimestamp) : Instant.now();
+ checkpointMark.advanceWatermark(currentTimestamp);
} catch (IOException e) {
throw e;
} catch (Exception e) {
diff --git a/sdks/python/apache_beam/coders/__init__.py b/sdks/python/apache_beam/coders/__init__.py
index 3192494..680f1c7 100644
--- a/sdks/python/apache_beam/coders/__init__.py
+++ b/sdks/python/apache_beam/coders/__init__.py
@@ -17,4 +17,5 @@
from __future__ import absolute_import
from apache_beam.coders.coders import *
+from apache_beam.coders.row_coder import *
from apache_beam.coders.typecoders import registry
diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py
new file mode 100644
index 0000000..a259f36
--- /dev/null
+++ b/sdks/python/apache_beam/coders/row_coder.py
@@ -0,0 +1,174 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import
+
+import itertools
+from array import array
+
+from apache_beam.coders.coder_impl import StreamCoderImpl
+from apache_beam.coders.coders import BytesCoder
+from apache_beam.coders.coders import Coder
+from apache_beam.coders.coders import FastCoder
+from apache_beam.coders.coders import FloatCoder
+from apache_beam.coders.coders import IterableCoder
+from apache_beam.coders.coders import StrUtf8Coder
+from apache_beam.coders.coders import TupleCoder
+from apache_beam.coders.coders import VarIntCoder
+from apache_beam.portability import common_urns
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import named_tuple_from_schema
+from apache_beam.typehints.schemas import named_tuple_to_schema
+
+__all__ = ["RowCoder"]
+
+
+class RowCoder(FastCoder):
+ """ Coder for `typing.NamedTuple` instances.
+
+ Implements the beam:coder:row:v1 standard coder spec.
+ """
+
+ def __init__(self, schema):
+ """Initializes a :class:`RowCoder`.
+
+ Args:
+ schema (apache_beam.portability.api.schema_pb2.Schema): The protobuf
+ representation of the schema of the data that the RowCoder will be used
+ to encode/decode.
+ """
+ self.schema = schema
+ self.components = [
+ RowCoder.coder_from_type(field.type) for field in self.schema.fields
+ ]
+
+ def _create_impl(self):
+ return RowCoderImpl(self.schema, self.components)
+
+ def is_deterministic(self):
+ return all(c.is_deterministic() for c in self.components)
+
+ def to_type_hint(self):
+ return named_tuple_from_schema(self.schema)
+
+ def as_cloud_object(self, coders_context=None):
+ raise NotImplementedError("as_cloud_object not supported for RowCoder")
+
+ __hash__ = None
+
+ def __eq__(self, other):
+ return type(self) == type(other) and self.schema == other.schema
+
+ def to_runner_api_parameter(self, unused_context):
+ return (common_urns.coders.ROW.urn, self.schema, [])
+
+ @Coder.register_urn(common_urns.coders.ROW.urn, schema_pb2.Schema)
+ def from_runner_api_parameter(payload, components, unused_context):
+ return RowCoder(payload)
+
+ @staticmethod
+ def from_type_hint(named_tuple_type, registry):
+ return RowCoder(named_tuple_to_schema(named_tuple_type))
+
+ @staticmethod
+ def coder_from_type(field_type):
+ type_info = field_type.WhichOneof("type_info")
+ if type_info == "atomic_type":
+ if field_type.atomic_type in (schema_pb2.INT32,
+ schema_pb2.INT64):
+ return VarIntCoder()
+ elif field_type.atomic_type == schema_pb2.DOUBLE:
+ return FloatCoder()
+ elif field_type.atomic_type == schema_pb2.STRING:
+ return StrUtf8Coder()
+ elif type_info == "array_type":
+ return IterableCoder(
+ RowCoder.coder_from_type(field_type.array_type.element_type))
+
+ # The Java SDK supports several more types, but the coders are not yet
+ # standard, and are not implemented in Python.
+ raise ValueError(
+ "Encountered a type that is not currently supported by RowCoder: %s" %
+ field_type)
+
+
+class RowCoderImpl(StreamCoderImpl):
+ """For internal use only; no backwards-compatibility guarantees."""
+ SIZE_CODER = VarIntCoder().get_impl()
+ NULL_MARKER_CODER = BytesCoder().get_impl()
+
+ def __init__(self, schema, components):
+ self.schema = schema
+ self.constructor = named_tuple_from_schema(schema)
+ self.components = list(c.get_impl() for c in components)
+ self.has_nullable_fields = any(
+ field.type.nullable for field in self.schema.fields)
+
+ def encode_to_stream(self, value, out, nested):
+ nvals = len(self.schema.fields)
+ self.SIZE_CODER.encode_to_stream(nvals, out, True)
+ attrs = [getattr(value, f.name) for f in self.schema.fields]
+
+ words = array('B')
+ if self.has_nullable_fields:
+ nulls = list(attr is None for attr in attrs)
+ if any(nulls):
+ words = array('B', itertools.repeat(0, (nvals+7)//8))
+ for i, is_null in enumerate(nulls):
+ words[i//8] |= is_null << (i % 8)
+
+ self.NULL_MARKER_CODER.encode_to_stream(words.tostring(), out, True)
+
+ for c, field, attr in zip(self.components, self.schema.fields, attrs):
+ if attr is None:
+ if not field.type.nullable:
+ raise ValueError(
+ "Attempted to encode null for non-nullable field \"{}\".".format(
+ field.name))
+ continue
+ c.encode_to_stream(attr, out, True)
+
+ def decode_from_stream(self, in_stream, nested):
+ nvals = self.SIZE_CODER.decode_from_stream(in_stream, True)
+ words = array('B')
+ words.fromstring(self.NULL_MARKER_CODER.decode_from_stream(in_stream, True))
+
+ if words:
+ nulls = ((words[i // 8] >> (i % 8)) & 0x01 for i in range(nvals))
+ else:
+ nulls = itertools.repeat(False, nvals)
+
+ # If this coder's schema has more attributes than the encoded value, then
+ # the schema must have changed. Populate the unencoded fields with nulls.
+ if len(self.components) > nvals:
+ nulls = itertools.chain(
+ nulls,
+ itertools.repeat(True, len(self.components) - nvals))
+
+ # Note that if this coder's schema has *fewer* attributes than the encoded
+ # value, we just need to ignore the additional values, which will occur
+ # here because we only decode as many values as we have coders for.
+ return self.constructor(*(
+ None if is_null else c.decode_from_stream(in_stream, True)
+ for c, is_null in zip(self.components, nulls)))
+
+ def _make_value_coder(self, nulls=itertools.repeat(False)):
+ components = [
+ component for component, is_null in zip(self.components, nulls)
+ if not is_null
+ ] if self.has_nullable_fields else self.components
+ return TupleCoder(components).get_impl()
diff --git a/sdks/python/apache_beam/coders/row_coder_test.py b/sdks/python/apache_beam/coders/row_coder_test.py
new file mode 100644
index 0000000..dbdc5fc
--- /dev/null
+++ b/sdks/python/apache_beam/coders/row_coder_test.py
@@ -0,0 +1,168 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+
+import logging
+import typing
+import unittest
+from itertools import chain
+
+import numpy as np
+from past.builtins import unicode
+
+from apache_beam.coders import RowCoder
+from apache_beam.coders.typecoders import registry as coders_registry
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import typing_to_runner_api
+
+Person = typing.NamedTuple("Person", [
+ ("name", unicode),
+ ("age", np.int32),
+ ("address", typing.Optional[unicode]),
+ ("aliases", typing.List[unicode]),
+])
+
+coders_registry.register_coder(Person, RowCoder)
+
+
+class RowCoderTest(unittest.TestCase):
+ TEST_CASES = [
+ Person("Jon Snow", 23, None, ["crow", "wildling"]),
+ Person("Daenerys Targaryen", 25, "Westeros", ["Mother of Dragons"]),
+ Person("Michael Bluth", 30, None, [])
+ ]
+
+ def test_create_row_coder_from_named_tuple(self):
+ expected_coder = RowCoder(typing_to_runner_api(Person).row_type.schema)
+ real_coder = coders_registry.get_coder(Person)
+
+ for test_case in self.TEST_CASES:
+ self.assertEqual(
+ expected_coder.encode(test_case), real_coder.encode(test_case))
+
+ self.assertEqual(test_case,
+ real_coder.decode(real_coder.encode(test_case)))
+
+ def test_create_row_coder_from_schema(self):
+ schema = schema_pb2.Schema(
+ id="person",
+ fields=[
+ schema_pb2.Field(
+ name="name",
+ type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.STRING)),
+ schema_pb2.Field(
+ name="age",
+ type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.INT32)),
+ schema_pb2.Field(
+ name="address",
+ type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.STRING, nullable=True)),
+ schema_pb2.Field(
+ name="aliases",
+ type=schema_pb2.FieldType(
+ array_type=schema_pb2.ArrayType(
+ element_type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.STRING)))),
+ ])
+ coder = RowCoder(schema)
+
+ for test_case in self.TEST_CASES:
+ self.assertEqual(test_case, coder.decode(coder.encode(test_case)))
+
+ @unittest.skip(
+ "BEAM-8030 - Overflow behavior in VarIntCoder is currently inconsistent"
+ )
+ def test_overflows(self):
+ IntTester = typing.NamedTuple('IntTester', [
+ # TODO(BEAM-7996): Test int8 and int16 here as well when those types are
+ # supported
+ # ('i8', typing.Optional[np.int8]),
+ # ('i16', typing.Optional[np.int16]),
+ ('i32', typing.Optional[np.int32]),
+ ('i64', typing.Optional[np.int64]),
+ ])
+
+ c = RowCoder.from_type_hint(IntTester, None)
+
+ no_overflow = chain(
+ (IntTester(i32=i, i64=None) for i in (-2**31, 2**31-1)),
+ (IntTester(i32=None, i64=i) for i in (-2**63, 2**63-1)),
+ )
+
+ # Encode max/min ints to make sure they don't throw any error
+ for case in no_overflow:
+ c.encode(case)
+
+ overflow = chain(
+ (IntTester(i32=i, i64=None) for i in (-2**31-1, 2**31)),
+ (IntTester(i32=None, i64=i) for i in (-2**63-1, 2**63)),
+ )
+
+ # Encode max+1/min-1 ints to make sure they DO throw an error
+ for case in overflow:
+ self.assertRaises(OverflowError, lambda: c.encode(case))
+
+ def test_none_in_non_nullable_field_throws(self):
+ Test = typing.NamedTuple('Test', [('foo', unicode)])
+
+ c = RowCoder.from_type_hint(Test, None)
+ self.assertRaises(ValueError, lambda: c.encode(Test(foo=None)))
+
+ def test_schema_remove_column(self):
+ fields = [("field1", unicode), ("field2", unicode)]
+ # new schema is missing one field that was in the old schema
+ Old = typing.NamedTuple('Old', fields)
+ New = typing.NamedTuple('New', fields[:-1])
+
+ old_coder = RowCoder.from_type_hint(Old, None)
+ new_coder = RowCoder.from_type_hint(New, None)
+
+ self.assertEqual(
+ New("foo"), new_coder.decode(old_coder.encode(Old("foo", "bar"))))
+
+ def test_schema_add_column(self):
+ fields = [("field1", unicode), ("field2", typing.Optional[unicode])]
+ # new schema has one (optional) field that didn't exist in the old schema
+ Old = typing.NamedTuple('Old', fields[:-1])
+ New = typing.NamedTuple('New', fields)
+
+ old_coder = RowCoder.from_type_hint(Old, None)
+ new_coder = RowCoder.from_type_hint(New, None)
+
+ self.assertEqual(
+ New("bar", None), new_coder.decode(old_coder.encode(Old("bar"))))
+
+ def test_schema_add_column_with_null_value(self):
+ fields = [("field1", typing.Optional[unicode]), ("field2", unicode),
+ ("field3", typing.Optional[unicode])]
+ # new schema has one (optional) field that didn't exist in the old schema
+ Old = typing.NamedTuple('Old', fields[:-1])
+ New = typing.NamedTuple('New', fields)
+
+ old_coder = RowCoder.from_type_hint(Old, None)
+ new_coder = RowCoder.from_type_hint(New, None)
+
+ self.assertEqual(
+ New(None, "baz", None),
+ new_coder.decode(old_coder.encode(Old(None, "baz"))))
+
+
+if __name__ == "__main__":
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 606ca81..5ffbeea 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -32,9 +32,11 @@
from apache_beam.coders import coder_impl
from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.portability.api import schema_pb2
from apache_beam.runners import pipeline_context
from apache_beam.transforms import window
from apache_beam.transforms.window import IntervalWindow
+from apache_beam.typehints import schemas
from apache_beam.utils import windowed_value
from apache_beam.utils.timestamp import Timestamp
@@ -65,6 +67,42 @@
return x
+def value_parser_from_schema(schema):
+ def attribute_parser_from_type(type_):
+ # TODO: This should be exhaustive
+ type_info = type_.WhichOneof("type_info")
+ if type_info == "atomic_type":
+ return schemas.ATOMIC_TYPE_TO_PRIMITIVE[type_.atomic_type]
+ elif type_info == "array_type":
+ element_parser = attribute_parser_from_type(type_.array_type.element_type)
+ return lambda x: list(map(element_parser, x))
+ elif type_info == "map_type":
+ key_parser = attribute_parser_from_type(type_.array_type.key_type)
+ value_parser = attribute_parser_from_type(type_.array_type.value_type)
+ return lambda x: dict((key_parser(k), value_parser(v))
+ for k, v in x.items())
+
+ parsers = [(field.name, attribute_parser_from_type(field.type))
+ for field in schema.fields]
+
+ constructor = schemas.named_tuple_from_schema(schema)
+
+ def value_parser(x):
+ result = []
+ for name, parser in parsers:
+ value = x.pop(name)
+ result.append(None if value is None else parser(value))
+
+ if len(x):
+ raise ValueError(
+ "Test data contains attributes that don't exist in the schema: {}"
+ .format(', '.join(x.keys())))
+
+ return constructor(*result)
+
+ return value_parser
+
+
class StandardCodersTest(unittest.TestCase):
_urn_to_json_value_parser = {
@@ -134,11 +172,17 @@
for c in spec.get('components', ())]
context.coders.put_proto(coder_id, beam_runner_api_pb2.Coder(
spec=beam_runner_api_pb2.FunctionSpec(
- urn=spec['urn'], payload=spec.get('payload')),
+ urn=spec['urn'], payload=spec.get('payload', '').encode('latin1')),
component_coder_ids=component_ids))
return context.coders.get_by_id(coder_id)
def json_value_parser(self, coder_spec):
+ # TODO: integrate this with the logic for the other parsers
+ if coder_spec['urn'] == 'beam:coder:row:v1':
+ schema = schema_pb2.Schema.FromString(
+ coder_spec['payload'].encode('latin1'))
+ return value_parser_from_schema(schema)
+
component_parsers = [
self.json_value_parser(c) for c in coder_spec.get('components', ())]
return lambda x: self._urn_to_json_value_parser[coder_spec['urn']](
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 039eaf0..4a5fef4 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -385,12 +385,9 @@
pipeline.replace_all(DataflowRunner._SDF_PTRANSFORM_OVERRIDES)
use_fnapi = apiclient._use_fnapi(options)
- from apache_beam.portability.api import beam_runner_api_pb2
- default_environment = beam_runner_api_pb2.Environment(
- urn=common_urns.environments.DOCKER.urn,
- payload=beam_runner_api_pb2.DockerPayload(
- container_image=apiclient.get_container_image_from_options(options)
- ).SerializeToString())
+ from apache_beam.transforms import environments
+ default_environment = environments.DockerEnvironment(
+ container_image=apiclient.get_container_image_from_options(options))
# Snapshot the pipeline in a portable proto.
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
diff --git a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
index 3709deb..1f0e925 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization.py
@@ -73,8 +73,8 @@
document.querySelector("#{display_id}").protoInput = "{protostr}";
</script>"""
_DATAFRAME_PAGINATION_TEMPLATE = """
- <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.2/jquery.min.js"></script>
- <script src="https://cdn.datatables.net/1.10.16/js/jquery.dataTables.js"></script>
+ <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.2/jquery.min.js"></script>
+ <script src="https://cdn.datatables.net/1.10.16/js/jquery.dataTables.js"></script>
<link rel="stylesheet" href="https://cdn.datatables.net/1.10.16/css/jquery.dataTables.css">
{dataframe_html}
<script>
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index 913dac6..e4328df 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -31,25 +31,10 @@
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.transforms import core
+from apache_beam.transforms import environments
from apache_beam.typehints import native_type_compatibility
-class Environment(object):
- """A wrapper around the environment proto.
-
- Provides consistency with how the other componentes are accessed.
- """
- def __init__(self, proto):
- self.proto = proto
-
- def to_runner_api(self, context):
- return self.proto
-
- @staticmethod
- def from_runner_api(proto, context):
- return Environment(proto)
-
-
class _PipelineContextMap(object):
"""This is a bi-directional map between objects and ids.
@@ -128,7 +113,7 @@
'pcollections': pvalue.PCollection,
'coders': coders.Coder,
'windowing_strategies': core.Windowing,
- 'environments': Environment,
+ 'environments': environments.Environment,
}
def __init__(
@@ -146,7 +131,7 @@
self, cls, namespace, getattr(proto, name, None)))
if default_environment:
self._default_environment_id = self.environments.get_id(
- Environment(default_environment), label='default_environment')
+ default_environment, label='default_environment')
else:
self._default_environment_id = None
self.use_fake_coders = use_fake_coders
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index dd0c1e2..9f97e2f 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -72,6 +72,7 @@
from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
from apache_beam.runners.worker.sdk_worker import _Future
from apache_beam.runners.worker.statecache import StateCache
+from apache_beam.transforms import environments
from apache_beam.transforms import trigger
from apache_beam.transforms.window import GlobalWindow
from apache_beam.transforms.window import GlobalWindows
@@ -344,7 +345,7 @@
self._last_uid = -1
self._default_environment = (
default_environment
- or beam_runner_api_pb2.Environment(urn=python_urns.EMBEDDED_PYTHON))
+ or environments.EmbeddedPythonEnvironment())
self._bundle_repeat = bundle_repeat
self._num_workers = 1
self._progress_frequency = progress_request_frequency
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 1f368c0..2204a24 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -47,8 +47,6 @@
from apache_beam.metrics.metricbase import MetricName
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.portability import python_urns
-from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.portability import fn_api_runner
from apache_beam.runners.worker import data_plane
from apache_beam.runners.worker import sdk_worker
@@ -58,6 +56,7 @@
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.tools import utils
+from apache_beam.transforms import environments
from apache_beam.transforms import userstate
from apache_beam.transforms import window
@@ -1085,8 +1084,7 @@
def create_pipeline(self):
return beam.Pipeline(
runner=fn_api_runner.FnApiRunner(
- default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC)))
+ default_environment=environments.EmbeddedPythonGrpcEnvironment()))
class FnApiRunnerTestWithGrpcMultiThreaded(FnApiRunnerTest):
@@ -1094,9 +1092,9 @@
def create_pipeline(self):
return beam.Pipeline(
runner=fn_api_runner.FnApiRunner(
- default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC,
- payload=b'2,%d' % fn_api_runner.STATE_CACHE_SIZE)))
+ default_environment=environments.EmbeddedPythonGrpcEnvironment(
+ num_workers=2,
+ state_cache_size=fn_api_runner.STATE_CACHE_SIZE)))
class FnApiRunnerTestWithDisabledCaching(FnApiRunnerTest):
@@ -1104,10 +1102,8 @@
def create_pipeline(self):
return beam.Pipeline(
runner=fn_api_runner.FnApiRunner(
- default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC,
- # number of workers, state cache size
- payload=b'2,0')))
+ default_environment=environments.EmbeddedPythonGrpcEnvironment(
+ num_workers=2, state_cache_size=0)))
class FnApiRunnerTestWithMultiWorkers(FnApiRunnerTest):
@@ -1134,8 +1130,7 @@
pipeline_options = PipelineOptions(direct_num_workers=2)
p = beam.Pipeline(
runner=fn_api_runner.FnApiRunner(
- default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC)),
+ default_environment=environments.EmbeddedPythonGrpcEnvironment()),
options=pipeline_options)
#TODO(BEAM-8444): Fix these tests..
p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
@@ -1185,8 +1180,7 @@
# to the bundle process request.
return beam.Pipeline(
runner=fn_api_runner.FnApiRunner(
- default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC)))
+ default_environment=environments.EmbeddedPythonGrpcEnvironment()))
def test_checkpoint(self):
# This split manager will get re-invoked on each smaller split,
@@ -1490,8 +1484,7 @@
pipeline_options = PipelineOptions(direct_num_workers=2)
p = beam.Pipeline(
runner=fn_api_runner.FnApiRunner(
- default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC)),
+ default_environment=environments.EmbeddedPythonGrpcEnvironment()),
options=pipeline_options)
#TODO(BEAM-8444): Fix these tests..
p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
@@ -1508,8 +1501,7 @@
def create_pipeline(self):
return beam.Pipeline(
runner=fn_api_runner.FnApiRunner(
- default_environment=beam_runner_api_pb2.Environment(
- urn=python_urns.EMBEDDED_PYTHON_GRPC),
+ default_environment=environments.EmbeddedPythonGrpcEnvironment(),
progress_request_frequency=0.5))
def test_lull_logging(self):
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index c7fb76c..2cffd47 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -19,7 +19,6 @@
import functools
import itertools
-import json
import logging
import sys
import threading
@@ -36,8 +35,6 @@
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_job_api_pb2
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners import runner
from apache_beam.runners.job import utils as job_utils
from apache_beam.runners.portability import fn_api_runner_transforms
@@ -46,6 +43,7 @@
from apache_beam.runners.portability import portable_stager
from apache_beam.runners.worker import sdk_worker_main
from apache_beam.runners.worker import worker_pool_main
+from apache_beam.transforms import environments
__all__ = ['PortableRunner']
@@ -65,6 +63,8 @@
beam_job_api_pb2.JobState.CANCELLED,
]
+ENV_TYPE_ALIASES = {'LOOPBACK': 'EXTERNAL'}
+
class PortableRunner(runner.PipelineRunner):
"""
@@ -102,65 +102,24 @@
# does not exist in the Java SDK. In portability, the entry point is clearly
# defined via the JobService.
portable_options.view_as(StandardOptions).runner = None
- environment_urn = common_urns.environments.DOCKER.urn
- if portable_options.environment_type == 'DOCKER':
+ environment_type = portable_options.environment_type
+ if not environment_type:
environment_urn = common_urns.environments.DOCKER.urn
- elif portable_options.environment_type == 'PROCESS':
- environment_urn = common_urns.environments.PROCESS.urn
- elif portable_options.environment_type in ('EXTERNAL', 'LOOPBACK'):
- environment_urn = common_urns.environments.EXTERNAL.urn
- elif portable_options.environment_type:
- if portable_options.environment_type.startswith('beam:env:'):
- environment_urn = portable_options.environment_type
- else:
- raise ValueError(
- 'Unknown environment type: %s' % portable_options.environment_type)
-
- if environment_urn == common_urns.environments.DOCKER.urn:
- docker_image = (
- portable_options.environment_config
- or PortableRunner.default_docker_image())
- return beam_runner_api_pb2.Environment(
- urn=common_urns.environments.DOCKER.urn,
- payload=beam_runner_api_pb2.DockerPayload(
- container_image=docker_image
- ).SerializeToString())
- elif environment_urn == common_urns.environments.PROCESS.urn:
- config = json.loads(portable_options.environment_config)
- return beam_runner_api_pb2.Environment(
- urn=common_urns.environments.PROCESS.urn,
- payload=beam_runner_api_pb2.ProcessPayload(
- os=(config.get('os') or ''),
- arch=(config.get('arch') or ''),
- command=config.get('command'),
- env=(config.get('env') or '')
- ).SerializeToString())
- elif environment_urn == common_urns.environments.EXTERNAL.urn:
- def looks_like_json(environment_config):
- import re
- return re.match(r'\s*\{.*\}\s*$', environment_config)
-
- if looks_like_json(portable_options.environment_config):
- config = json.loads(portable_options.environment_config)
- url = config.get('url')
- if not url:
- raise ValueError('External environment endpoint must be set.')
- params = config.get('params')
- else:
- url = portable_options.environment_config
- params = None
-
- return beam_runner_api_pb2.Environment(
- urn=common_urns.environments.EXTERNAL.urn,
- payload=beam_runner_api_pb2.ExternalPayload(
- endpoint=endpoints_pb2.ApiServiceDescriptor(url=url),
- params=params
- ).SerializeToString())
+ elif environment_type.startswith('beam:env:'):
+ environment_urn = environment_type
else:
- return beam_runner_api_pb2.Environment(
- urn=environment_urn,
- payload=(portable_options.environment_config.encode('ascii')
- if portable_options.environment_config else None))
+ # e.g. handle LOOPBACK -> EXTERNAL
+ environment_type = ENV_TYPE_ALIASES.get(environment_type,
+ environment_type)
+ try:
+ environment_urn = getattr(common_urns.environments,
+ environment_type).urn
+ except AttributeError:
+ raise ValueError(
+ 'Unknown environment type: %s' % environment_type)
+
+ env_class = environments.Environment.get_env_cls_from_urn(environment_urn)
+ return env_class.from_options(portable_options)
def default_job_server(self, portable_options):
# TODO Provide a way to specify a container Docker URL
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
index 3658c21..24c6b87 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
@@ -36,18 +36,16 @@
from apache_beam.options.pipeline_options import DirectOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import PortableOptions
-from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_job_api_pb2_grpc
-from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.portability import fn_api_runner_test
from apache_beam.runners.portability import portable_runner
from apache_beam.runners.portability.local_job_service import LocalJobServicer
from apache_beam.runners.portability.portable_runner import PortableRunner
from apache_beam.runners.worker import worker_pool_main
from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
+from apache_beam.transforms import environments
class PortableRunnerTest(fn_api_runner_test.FnApiRunnerTest):
@@ -261,11 +259,7 @@
docker_image = PortableRunner.default_docker_image()
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({})),
- beam_runner_api_pb2.Environment(
- urn=common_urns.environments.DOCKER.urn,
- payload=beam_runner_api_pb2.DockerPayload(
- container_image=docker_image
- ).SerializeToString()))
+ environments.DockerEnvironment(container_image=docker_image))
def test__create_docker_environment(self):
docker_image = 'py-docker'
@@ -273,11 +267,7 @@
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': 'DOCKER',
'environment_config': docker_image,
- })), beam_runner_api_pb2.Environment(
- urn=common_urns.environments.DOCKER.urn,
- payload=beam_runner_api_pb2.DockerPayload(
- container_image=docker_image
- ).SerializeToString()))
+ })), environments.DockerEnvironment(container_image=docker_image))
def test__create_process_environment(self):
self.assertEqual(
@@ -286,48 +276,28 @@
'environment_config': '{"os": "linux", "arch": "amd64", '
'"command": "run.sh", '
'"env":{"k1": "v1"} }',
- })), beam_runner_api_pb2.Environment(
- urn=common_urns.environments.PROCESS.urn,
- payload=beam_runner_api_pb2.ProcessPayload(
- os='linux',
- arch='amd64',
- command='run.sh',
- env={'k1': 'v1'},
- ).SerializeToString()))
+ })), environments.ProcessEnvironment('run.sh', os='linux', arch='amd64',
+ env={'k1': 'v1'}))
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': 'PROCESS',
'environment_config': '{"command": "run.sh"}',
- })), beam_runner_api_pb2.Environment(
- urn=common_urns.environments.PROCESS.urn,
- payload=beam_runner_api_pb2.ProcessPayload(
- command='run.sh',
- ).SerializeToString()))
+ })), environments.ProcessEnvironment('run.sh'))
def test__create_external_environment(self):
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': "EXTERNAL",
'environment_config': 'localhost:50000',
- })), beam_runner_api_pb2.Environment(
- urn=common_urns.environments.EXTERNAL.urn,
- payload=beam_runner_api_pb2.ExternalPayload(
- endpoint=endpoints_pb2.ApiServiceDescriptor(
- url='localhost:50000')
- ).SerializeToString()))
- raw_config = ' {"url":"localhost:50000", "params":{"test":"test"}} '
+ })), environments.ExternalEnvironment('localhost:50000'))
+ raw_config = ' {"url":"localhost:50000", "params":{"k1":"v1"}} '
for env_config in (raw_config, raw_config.lstrip(), raw_config.strip()):
self.assertEqual(
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': "EXTERNAL",
'environment_config': env_config,
- })), beam_runner_api_pb2.Environment(
- urn=common_urns.environments.EXTERNAL.urn,
- payload=beam_runner_api_pb2.ExternalPayload(
- endpoint=endpoints_pb2.ApiServiceDescriptor(
- url='localhost:50000'),
- params={"test": "test"}
- ).SerializeToString()))
+ })), environments.ExternalEnvironment('localhost:50000',
+ params={"k1":"v1"}))
with self.assertRaises(ValueError):
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': "EXTERNAL",
@@ -336,7 +306,7 @@
with self.assertRaises(ValueError) as ctx:
PortableRunner._create_environment(PipelineOptions.from_dictionary({
'environment_type': "EXTERNAL",
- 'environment_config': '{"params":{"test":"test"}}',
+ 'environment_config': '{"params":{"k1":"v1"}}',
}))
self.assertIn(
'External environment endpoint must be set.', ctx.exception.args)
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
new file mode 100644
index 0000000..8758ab8
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -0,0 +1,396 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Environments concepts.
+
+For internal use only. No backwards compatibility guarantees."""
+
+from __future__ import absolute_import
+
+import json
+
+from google.protobuf import message
+
+from apache_beam.portability import common_urns
+from apache_beam.portability import python_urns
+from apache_beam.portability.api import beam_runner_api_pb2
+from apache_beam.portability.api import endpoints_pb2
+from apache_beam.utils import proto_utils
+
+__all__ = ['Environment',
+ 'DockerEnvironment', 'ProcessEnvironment', 'ExternalEnvironment',
+ 'EmbeddedPythonEnvironment', 'EmbeddedPythonGrpcEnvironment',
+ 'SubprocessSDKEnvironment', 'RunnerAPIEnvironmentHolder']
+
+
+class Environment(object):
+ """Abstract base class for environments.
+
+ Represents a type and configuration of environment.
+ Each type of Environment should have a unique urn.
+
+ For internal use only. No backwards compatibility guarantees.
+ """
+
+ _known_urns = {}
+ _urn_to_env_cls = {}
+
+ def to_runner_api_parameter(self, context):
+ raise NotImplementedError
+
+ @classmethod
+ def register_urn(cls, urn, parameter_type, constructor=None):
+
+ def register(constructor):
+ if isinstance(constructor, type):
+ constructor.from_runner_api_parameter = register(
+ constructor.from_runner_api_parameter)
+ # register environment urn to environment class
+ cls._urn_to_env_cls[urn] = constructor
+ return constructor
+
+ else:
+ cls._known_urns[urn] = parameter_type, constructor
+ return staticmethod(constructor)
+
+ if constructor:
+ # Used as a statement.
+ register(constructor)
+ else:
+ # Used as a decorator.
+ return register
+
+ @classmethod
+ def get_env_cls_from_urn(cls, urn):
+ return cls._urn_to_env_cls[urn]
+
+ def to_runner_api(self, context):
+ urn, typed_param = self.to_runner_api_parameter(context)
+ return beam_runner_api_pb2.Environment(
+ urn=urn,
+ payload=typed_param.SerializeToString()
+ if isinstance(typed_param, message.Message)
+ else typed_param if (isinstance(typed_param, bytes) or
+ typed_param is None)
+ else typed_param.encode('utf-8')
+ )
+
+ @classmethod
+ def from_runner_api(cls, proto, context):
+ if proto is None or not proto.urn:
+ return None
+ parameter_type, constructor = cls._known_urns[proto.urn]
+
+ try:
+ return constructor(
+ proto_utils.parse_Bytes(proto.payload, parameter_type),
+ context)
+ except Exception:
+ if context.allow_proto_holders:
+ return RunnerAPIEnvironmentHolder(proto)
+ raise
+
+ @classmethod
+ def from_options(cls, options):
+ """Creates an Environment object from PipelineOptions.
+
+ Args:
+ options: The PipelineOptions object.
+ """
+ raise NotImplementedError
+
+
+@Environment.register_urn(common_urns.environments.DOCKER.urn,
+ beam_runner_api_pb2.DockerPayload)
+class DockerEnvironment(Environment):
+
+ def __init__(self, container_image=None):
+ from apache_beam.runners.portability.portable_runner import PortableRunner
+
+ if container_image:
+ self.container_image = container_image
+ else:
+ self.container_image = PortableRunner.default_docker_image()
+
+ def __eq__(self, other):
+ return self.__class__ == other.__class__ \
+ and self.container_image == other.container_image
+
+ def __ne__(self, other):
+ # TODO(BEAM-5949): Needed for Python 2 compatibility.
+ return not self == other
+
+ def __hash__(self):
+ return hash((self.__class__, self.container_image))
+
+ def __repr__(self):
+ return 'DockerEnvironment(container_image=%s)' % self.container_image
+
+ def to_runner_api_parameter(self, context):
+ return (common_urns.environments.DOCKER.urn,
+ beam_runner_api_pb2.DockerPayload(
+ container_image=self.container_image))
+
+ @staticmethod
+ def from_runner_api_parameter(payload, context):
+ return DockerEnvironment(container_image=payload.container_image)
+
+ @classmethod
+ def from_options(cls, options):
+ return cls(container_image=options.environment_config)
+
+
+@Environment.register_urn(common_urns.environments.PROCESS.urn,
+ beam_runner_api_pb2.ProcessPayload)
+class ProcessEnvironment(Environment):
+
+ def __init__(self, command, os='', arch='', env=None):
+ self.command = command
+ self.os = os
+ self.arch = arch
+ self.env = env or {}
+
+ def __eq__(self, other):
+ return self.__class__ == other.__class__ \
+ and self.command == other.command and self.os == other.os \
+ and self.arch == other.arch and self.env == other.env
+
+ def __ne__(self, other):
+ # TODO(BEAM-5949): Needed for Python 2 compatibility.
+ return not self == other
+
+ def __hash__(self):
+ return hash((self.__class__, self.command, self.os, self.arch,
+ frozenset(self.env.items())))
+
+ def __repr__(self):
+ repr_parts = ['command=%s' % self.command]
+ if self.os:
+ repr_parts.append('os=%s'% self.os)
+ if self.arch:
+ repr_parts.append('arch=%s' % self.arch)
+ repr_parts.append('env=%s' % self.env)
+ return 'ProcessEnvironment(%s)' % ','.join(repr_parts)
+
+ def to_runner_api_parameter(self, context):
+ return (common_urns.environments.PROCESS.urn,
+ beam_runner_api_pb2.ProcessPayload(
+ os=self.os,
+ arch=self.arch,
+ command=self.command,
+ env=self.env))
+
+ @staticmethod
+ def from_runner_api_parameter(payload, context):
+ return ProcessEnvironment(command=payload.command, os=payload.os,
+ arch=payload.arch, env=payload.env)
+
+ @classmethod
+ def from_options(cls, options):
+ config = json.loads(options.environment_config)
+ return cls(config.get('command'), os=config.get('os', ''),
+ arch=config.get('arch', ''), env=config.get('env', ''))
+
+
+@Environment.register_urn(common_urns.environments.EXTERNAL.urn,
+ beam_runner_api_pb2.ExternalPayload)
+class ExternalEnvironment(Environment):
+
+ def __init__(self, url, params=None):
+ self.url = url
+ self.params = params
+
+ def __eq__(self, other):
+ return self.__class__ == other.__class__ and self.url == other.url \
+ and self.params == other.params
+
+ def __ne__(self, other):
+ # TODO(BEAM-5949): Needed for Python 2 compatibility.
+ return not self == other
+
+ def __hash__(self):
+ params = self.params
+ if params is not None:
+ params = frozenset(self.params.items())
+ return hash((self.__class__, self.url, params))
+
+ def __repr__(self):
+ return 'ExternalEnvironment(url=%s,params=%s)' % (self.url, self.params)
+
+ def to_runner_api_parameter(self, context):
+ return (common_urns.environments.EXTERNAL.urn,
+ beam_runner_api_pb2.ExternalPayload(
+ endpoint=endpoints_pb2.ApiServiceDescriptor(url=self.url),
+ params=self.params
+ ))
+
+ @staticmethod
+ def from_runner_api_parameter(payload, context):
+ return ExternalEnvironment(payload.endpoint.url,
+ params=payload.params or None)
+
+ @classmethod
+ def from_options(cls, options):
+ def looks_like_json(environment_config):
+ import re
+ return re.match(r'\s*\{.*\}\s*$', environment_config)
+
+ if looks_like_json(options.environment_config):
+ config = json.loads(options.environment_config)
+ url = config.get('url')
+ if not url:
+ raise ValueError('External environment endpoint must be set.')
+ params = config.get('params')
+ else:
+ url = options.environment_config
+ params = None
+
+ return cls(url, params=params)
+
+
+@Environment.register_urn(python_urns.EMBEDDED_PYTHON, None)
+class EmbeddedPythonEnvironment(Environment):
+
+ def __eq__(self, other):
+ return self.__class__ == other.__class__
+
+ def __ne__(self, other):
+ # TODO(BEAM-5949): Needed for Python 2 compatibility.
+ return not self == other
+
+ def __hash__(self):
+ return hash(self.__class__)
+
+ def to_runner_api_parameter(self, context):
+ return python_urns.EMBEDDED_PYTHON, None
+
+ @staticmethod
+ def from_runner_api_parameter(unused_payload, context):
+ return EmbeddedPythonEnvironment()
+
+ @classmethod
+ def from_options(cls, options):
+ return cls()
+
+
+@Environment.register_urn(python_urns.EMBEDDED_PYTHON_GRPC, bytes)
+class EmbeddedPythonGrpcEnvironment(Environment):
+
+ def __init__(self, num_workers=None, state_cache_size=None):
+ self.num_workers = num_workers
+ self.state_cache_size = state_cache_size
+
+ def __eq__(self, other):
+ return self.__class__ == other.__class__ \
+ and self.num_workers == other.num_workers \
+ and self.state_cache_size == other.state_cache_size
+
+ def __ne__(self, other):
+ # TODO(BEAM-5949): Needed for Python 2 compatibility.
+ return not self == other
+
+ def __hash__(self):
+ return hash((self.__class__, self.num_workers, self.state_cache_size))
+
+ def __repr__(self):
+ repr_parts = []
+ if not self.num_workers is None:
+ repr_parts.append('num_workers=%d' % self.num_workers)
+ if not self.state_cache_size is None:
+ repr_parts.append('state_cache_size=%d' % self.state_cache_size)
+ return 'EmbeddedPythonGrpcEnvironment(%s)' % ','.join(repr_parts)
+
+ def to_runner_api_parameter(self, context):
+ if self.num_workers is None and self.state_cache_size is None:
+ payload = b''
+ elif self.num_workers is not None and self.state_cache_size is not None:
+ payload = b'%d,%d' % (self.num_workers, self.state_cache_size)
+ else:
+ # We want to make sure that the environment stays the same through the
+ # roundtrip to runner api, so here we don't want to set default for the
+ # other if only one of num workers or state cache size is set
+ raise ValueError('Must provide worker num and state cache size.')
+ return python_urns.EMBEDDED_PYTHON_GRPC, payload
+
+ @staticmethod
+ def from_runner_api_parameter(payload, context):
+ if payload:
+ num_workers, state_cache_size = payload.decode('utf-8').split(',')
+ return EmbeddedPythonGrpcEnvironment(
+ num_workers=int(num_workers),
+ state_cache_size=int(state_cache_size))
+ else:
+ return EmbeddedPythonGrpcEnvironment()
+
+ @classmethod
+ def from_options(cls, options):
+ if options.environment_config:
+ num_workers, state_cache_size = options.environment_config.split(',')
+ return cls(num_workers=num_workers, state_cache_size=state_cache_size)
+ else:
+ return cls()
+
+
+@Environment.register_urn(python_urns.SUBPROCESS_SDK, bytes)
+class SubprocessSDKEnvironment(Environment):
+
+ def __init__(self, command_string):
+ self.command_string = command_string
+
+ def __eq__(self, other):
+ return self.__class__ == other.__class__ \
+ and self.command_string == other.command_string
+
+ def __ne__(self, other):
+ # TODO(BEAM-5949): Needed for Python 2 compatibility.
+ return not self == other
+
+ def __hash__(self):
+ return hash((self.__class__, self.command_string))
+
+ def __repr__(self):
+ return 'SubprocessSDKEnvironment(command_string=%s)' % self.container_string
+
+ def to_runner_api_parameter(self, context):
+ return python_urns.SUBPROCESS_SDK, self.command_string.encode('utf-8')
+
+ @staticmethod
+ def from_runner_api_parameter(payload, context):
+ return SubprocessSDKEnvironment(payload.decode('utf-8'))
+
+ @classmethod
+ def from_options(cls, options):
+ return cls(options.environment_config)
+
+
+class RunnerAPIEnvironmentHolder(Environment):
+
+ def __init__(self, proto):
+ self.proto = proto
+
+ def to_runner_api(self, context):
+ return self.proto
+
+ def __eq__(self, other):
+ return self.__class__ == other.__class__ and self.proto == other.proto
+
+ def __ne__(self, other):
+ # TODO(BEAM-5949): Needed for Python 2 compatibility.
+ return not self == other
+
+ def __hash__(self):
+ return hash((self.__class__, self.proto))
diff --git a/sdks/python/apache_beam/transforms/environments_test.py b/sdks/python/apache_beam/transforms/environments_test.py
new file mode 100644
index 0000000..0fd568c
--- /dev/null
+++ b/sdks/python/apache_beam/transforms/environments_test.py
@@ -0,0 +1,68 @@
+# -- coding: utf-8 --
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the transform.environments classes."""
+
+from __future__ import absolute_import
+
+import logging
+import unittest
+
+from apache_beam.runners import pipeline_context
+from apache_beam.transforms.environments import DockerEnvironment
+from apache_beam.transforms.environments import EmbeddedPythonEnvironment
+from apache_beam.transforms.environments import EmbeddedPythonGrpcEnvironment
+from apache_beam.transforms.environments import Environment
+from apache_beam.transforms.environments import ExternalEnvironment
+from apache_beam.transforms.environments import ProcessEnvironment
+from apache_beam.transforms.environments import SubprocessSDKEnvironment
+
+
+class RunnerApiTest(unittest.TestCase):
+
+ def test_environment_encoding(self):
+ for environment in (
+ DockerEnvironment(),
+ DockerEnvironment(container_image='img'),
+ ProcessEnvironment('run.sh'),
+ ProcessEnvironment('run.sh', os='linux', arch='amd64',
+ env={'k1': 'v1'}),
+ ExternalEnvironment('localhost:8080'),
+ ExternalEnvironment('localhost:8080', params={'k1': 'v1'}),
+ EmbeddedPythonEnvironment(),
+ EmbeddedPythonGrpcEnvironment(),
+ EmbeddedPythonGrpcEnvironment(num_workers=2, state_cache_size=0),
+ SubprocessSDKEnvironment(command_string=u'foö')):
+ context = pipeline_context.PipelineContext()
+ self.assertEqual(
+ environment,
+ Environment.from_runner_api(
+ environment.to_runner_api(context), context)
+ )
+
+ with self.assertRaises(ValueError) as ctx:
+ EmbeddedPythonGrpcEnvironment(num_workers=2).to_runner_api(
+ pipeline_context.PipelineContext()
+ )
+ self.assertIn('Must provide worker num and state cache size.',
+ ctx.exception.args)
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index ad201d1..0f04d43 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -224,7 +224,7 @@
metric_results = res.metrics().query(MetricsFilter()
.with_name('recordsRead'))
outputs_counter = metric_results['counters'][0]
- self.assertEqual(outputs_counter.key.step, 'Read')
+ self.assertStartswith(outputs_counter.key.step, 'Read')
self.assertEqual(outputs_counter.key.metric.name, 'recordsRead')
self.assertEqual(outputs_counter.committed, 100)
self.assertEqual(outputs_counter.attempted, 100)
diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py b/sdks/python/apache_beam/typehints/native_type_compatibility.py
index 43cdedc..d73a1cf 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py
@@ -50,6 +50,17 @@
return None
+def _get_args(typ):
+ """Returns the index-th argument to the given type."""
+ try:
+ return typ.__args__
+ except AttributeError:
+ compatible_args = _get_compatible_args(typ)
+ if compatible_args is None:
+ raise
+ return compatible_args
+
+
def _get_arg(typ, index):
"""Returns the index-th argument to the given type."""
try:
@@ -105,6 +116,15 @@
return lambda user_type: type(user_type) == type(match_against)
+def _match_is_exactly_mapping(user_type):
+ # Avoid unintentionally catching all subtypes (e.g. strings and mappings).
+ if sys.version_info < (3, 7):
+ expected_origin = typing.Mapping
+ else:
+ expected_origin = collections.abc.Mapping
+ return getattr(user_type, '__origin__', None) is expected_origin
+
+
def _match_is_exactly_iterable(user_type):
# Avoid unintentionally catching all subtypes (e.g. strings and mappings).
if sys.version_info < (3, 7):
@@ -119,6 +139,22 @@
hasattr(user_type, '_field_types'))
+def _match_is_optional(user_type):
+ return _match_is_union(user_type) and sum(
+ tp is type(None) for tp in _get_args(user_type)) == 1
+
+
+def extract_optional_type(user_type):
+ """Extracts the non-None type from Optional type user_type.
+
+ If user_type is not Optional, returns None
+ """
+ if not _match_is_optional(user_type):
+ return None
+ else:
+ return next(tp for tp in _get_args(user_type) if tp is not type(None))
+
+
def _match_is_union(user_type):
# For non-subscripted unions (Python 2.7.14+ with typing 3.64)
if user_type is typing.Union:
diff --git a/sdks/python/apache_beam/typehints/schemas.py b/sdks/python/apache_beam/typehints/schemas.py
new file mode 100644
index 0000000..812cbe1
--- /dev/null
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -0,0 +1,218 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+""" Support for mapping python types to proto Schemas and back again.
+
+Python Schema
+np.int8 <-----> BYTE
+np.int16 <-----> INT16
+np.int32 <-----> INT32
+np.int64 <-----> INT64
+int ---/
+np.float32 <-----> FLOAT
+np.float64 <-----> DOUBLE
+float ---/
+bool <-----> BOOLEAN
+
+The mappings for STRING and BYTES are different between python 2 and python 3,
+because of the changes to str:
+py3:
+str/unicode <-----> STRING
+bytes <-----> BYTES
+ByteString ---/
+
+py2:
+str will be rejected since it is ambiguous.
+unicode <-----> STRING
+ByteString <-----> BYTES
+"""
+
+from __future__ import absolute_import
+
+import sys
+from typing import ByteString
+from typing import Mapping
+from typing import NamedTuple
+from typing import Optional
+from typing import Sequence
+from uuid import uuid4
+
+import numpy as np
+from past.builtins import unicode
+
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.native_type_compatibility import _get_args
+from apache_beam.typehints.native_type_compatibility import _match_is_exactly_mapping
+from apache_beam.typehints.native_type_compatibility import _match_is_named_tuple
+from apache_beam.typehints.native_type_compatibility import _match_is_optional
+from apache_beam.typehints.native_type_compatibility import _safe_issubclass
+from apache_beam.typehints.native_type_compatibility import extract_optional_type
+
+
+# Registry of typings for a schema by UUID
+class SchemaTypeRegistry(object):
+ def __init__(self):
+ self.by_id = {}
+ self.by_typing = {}
+
+ def add(self, typing, schema):
+ self.by_id[schema.id] = (typing, schema)
+
+ def get_typing_by_id(self, unique_id):
+ result = self.by_id.get(unique_id, None)
+ return result[0] if result is not None else None
+
+ def get_schema_by_id(self, unique_id):
+ result = self.by_id.get(unique_id, None)
+ return result[1] if result is not None else None
+
+
+SCHEMA_REGISTRY = SchemaTypeRegistry()
+
+
+# Bi-directional mappings
+_PRIMITIVES = (
+ (np.int8, schema_pb2.BYTE),
+ (np.int16, schema_pb2.INT16),
+ (np.int32, schema_pb2.INT32),
+ (np.int64, schema_pb2.INT64),
+ (np.float32, schema_pb2.FLOAT),
+ (np.float64, schema_pb2.DOUBLE),
+ (unicode, schema_pb2.STRING),
+ (bool, schema_pb2.BOOLEAN),
+ (bytes if sys.version_info.major >= 3 else ByteString,
+ schema_pb2.BYTES),
+)
+
+PRIMITIVE_TO_ATOMIC_TYPE = dict((typ, atomic) for typ, atomic in _PRIMITIVES)
+ATOMIC_TYPE_TO_PRIMITIVE = dict((atomic, typ) for typ, atomic in _PRIMITIVES)
+
+# One-way mappings
+PRIMITIVE_TO_ATOMIC_TYPE.update({
+ # In python 2, this is a no-op because we define it as the bi-directional
+ # mapping above. This just ensures the one-way mapping is defined in python
+ # 3.
+ ByteString: schema_pb2.BYTES,
+ # Allow users to specify a native int, and use INT64 as the cross-language
+ # representation. Technically ints have unlimited precision, but RowCoder
+ # should throw an error if it sees one with a bit width > 64 when encoding.
+ int: schema_pb2.INT64,
+ float: schema_pb2.DOUBLE,
+})
+
+
+def typing_to_runner_api(type_):
+ if _match_is_named_tuple(type_):
+ schema = None
+ if hasattr(type_, 'id'):
+ schema = SCHEMA_REGISTRY.get_schema_by_id(type_.id)
+ if schema is None:
+ fields = [
+ schema_pb2.Field(
+ name=name, type=typing_to_runner_api(type_._field_types[name]))
+ for name in type_._fields
+ ]
+ type_id = str(uuid4())
+ schema = schema_pb2.Schema(fields=fields, id=type_id)
+ SCHEMA_REGISTRY.add(type_, schema)
+
+ return schema_pb2.FieldType(
+ row_type=schema_pb2.RowType(
+ schema=schema))
+
+ # All concrete types (other than NamedTuple sub-classes) should map to
+ # a supported primitive type.
+ elif type_ in PRIMITIVE_TO_ATOMIC_TYPE:
+ return schema_pb2.FieldType(atomic_type=PRIMITIVE_TO_ATOMIC_TYPE[type_])
+
+ elif sys.version_info.major == 2 and type_ == str:
+ raise ValueError(
+ "type 'str' is not supported in python 2. Please use 'unicode' or "
+ "'typing.ByteString' instead to unambiguously indicate if this is a "
+ "UTF-8 string or a byte array."
+ )
+
+ elif _match_is_exactly_mapping(type_):
+ key_type, value_type = map(typing_to_runner_api, _get_args(type_))
+ return schema_pb2.FieldType(
+ map_type=schema_pb2.MapType(key_type=key_type, value_type=value_type))
+
+ elif _match_is_optional(type_):
+ # It's possible that a user passes us Optional[Optional[T]], but in python
+ # typing this is indistinguishable from Optional[T] - both resolve to
+ # Union[T, None] - so there's no need to check for that case here.
+ result = typing_to_runner_api(extract_optional_type(type_))
+ result.nullable = True
+ return result
+
+ elif _safe_issubclass(type_, Sequence):
+ element_type = typing_to_runner_api(_get_args(type_)[0])
+ return schema_pb2.FieldType(
+ array_type=schema_pb2.ArrayType(element_type=element_type))
+
+ raise ValueError("Unsupported type: %s" % type_)
+
+
+def typing_from_runner_api(fieldtype_proto):
+ if fieldtype_proto.nullable:
+ # In order to determine the inner type, create a copy of fieldtype_proto
+ # with nullable=False and pass back to typing_from_runner_api
+ base_type = schema_pb2.FieldType()
+ base_type.CopyFrom(fieldtype_proto)
+ base_type.nullable = False
+ return Optional[typing_from_runner_api(base_type)]
+
+ type_info = fieldtype_proto.WhichOneof("type_info")
+ if type_info == "atomic_type":
+ try:
+ return ATOMIC_TYPE_TO_PRIMITIVE[fieldtype_proto.atomic_type]
+ except KeyError:
+ raise ValueError("Unsupported atomic type: {0}".format(
+ fieldtype_proto.atomic_type))
+ elif type_info == "array_type":
+ return Sequence[typing_from_runner_api(
+ fieldtype_proto.array_type.element_type)]
+ elif type_info == "map_type":
+ return Mapping[
+ typing_from_runner_api(fieldtype_proto.map_type.key_type),
+ typing_from_runner_api(fieldtype_proto.map_type.value_type)
+ ]
+ elif type_info == "row_type":
+ schema = fieldtype_proto.row_type.schema
+ user_type = SCHEMA_REGISTRY.get_typing_by_id(schema.id)
+ if user_type is None:
+ from apache_beam import coders
+ type_name = 'BeamSchema_{}'.format(schema.id.replace('-', '_'))
+ user_type = NamedTuple(type_name,
+ [(field.name, typing_from_runner_api(field.type))
+ for field in schema.fields])
+ user_type.id = schema.id
+ SCHEMA_REGISTRY.add(user_type, schema)
+ coders.registry.register_coder(user_type, coders.RowCoder)
+ return user_type
+
+ elif type_info == "logical_type":
+ pass # TODO
+
+
+def named_tuple_from_schema(schema):
+ return typing_from_runner_api(
+ schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=schema)))
+
+
+def named_tuple_to_schema(named_tuple):
+ return typing_to_runner_api(named_tuple).row_type.schema
diff --git a/sdks/python/apache_beam/typehints/schemas_test.py b/sdks/python/apache_beam/typehints/schemas_test.py
new file mode 100644
index 0000000..9dd1bc2
--- /dev/null
+++ b/sdks/python/apache_beam/typehints/schemas_test.py
@@ -0,0 +1,270 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Tests for schemas."""
+
+from __future__ import absolute_import
+
+import itertools
+import sys
+import unittest
+from typing import ByteString
+from typing import List
+from typing import Mapping
+from typing import NamedTuple
+from typing import Optional
+from typing import Sequence
+
+import numpy as np
+from past.builtins import unicode
+
+from apache_beam.portability.api import schema_pb2
+from apache_beam.typehints.schemas import typing_from_runner_api
+from apache_beam.typehints.schemas import typing_to_runner_api
+
+IS_PYTHON_3 = sys.version_info.major > 2
+
+
+class SchemaTest(unittest.TestCase):
+ """ Tests for Runner API Schema proto to/from typing conversions
+
+ There are two main tests: test_typing_survives_proto_roundtrip, and
+ test_proto_survives_typing_roundtrip. These are both necessary because Schemas
+ are cached by ID, so performing just one of them wouldn't necessarily exercise
+ all code paths.
+ """
+
+ def test_typing_survives_proto_roundtrip(self):
+ all_nonoptional_primitives = [
+ np.int8,
+ np.int16,
+ np.int32,
+ np.int64,
+ np.float32,
+ np.float64,
+ unicode,
+ bool,
+ ]
+
+ # The bytes type cannot survive a roundtrip to/from proto in Python 2.
+ # In order to use BYTES a user type has to use typing.ByteString (because
+ # bytes == str, and we map str to STRING).
+ if IS_PYTHON_3:
+ all_nonoptional_primitives.extend([bytes])
+
+ all_optional_primitives = [
+ Optional[typ] for typ in all_nonoptional_primitives
+ ]
+
+ all_primitives = all_nonoptional_primitives + all_optional_primitives
+
+ basic_array_types = [Sequence[typ] for typ in all_primitives]
+
+ basic_map_types = [
+ Mapping[key_type,
+ value_type] for key_type, value_type in itertools.product(
+ all_primitives, all_primitives)
+ ]
+
+ selected_schemas = [
+ NamedTuple(
+ 'AllPrimitives',
+ [('field%d' % i, typ) for i, typ in enumerate(all_primitives)]),
+ NamedTuple('ComplexSchema', [
+ ('id', np.int64),
+ ('name', unicode),
+ ('optional_map', Optional[Mapping[unicode,
+ Optional[np.float64]]]),
+ ('optional_array', Optional[Sequence[np.float32]]),
+ ('array_optional', Sequence[Optional[bool]]),
+ ])
+ ]
+
+ test_cases = all_primitives + \
+ basic_array_types + \
+ basic_map_types + \
+ selected_schemas
+
+ for test_case in test_cases:
+ self.assertEqual(test_case,
+ typing_from_runner_api(typing_to_runner_api(test_case)))
+
+ def test_proto_survives_typing_roundtrip(self):
+ all_nonoptional_primitives = [
+ schema_pb2.FieldType(atomic_type=typ)
+ for typ in schema_pb2.AtomicType.values()
+ if typ is not schema_pb2.UNSPECIFIED
+ ]
+
+ # The bytes type cannot survive a roundtrip to/from proto in Python 2.
+ # In order to use BYTES a user type has to use typing.ByteString (because
+ # bytes == str, and we map str to STRING).
+ if not IS_PYTHON_3:
+ all_nonoptional_primitives.remove(
+ schema_pb2.FieldType(atomic_type=schema_pb2.BYTES))
+
+ all_optional_primitives = [
+ schema_pb2.FieldType(nullable=True, atomic_type=typ)
+ for typ in schema_pb2.AtomicType.values()
+ if typ is not schema_pb2.UNSPECIFIED
+ ]
+
+ all_primitives = all_nonoptional_primitives + all_optional_primitives
+
+ basic_array_types = [
+ schema_pb2.FieldType(array_type=schema_pb2.ArrayType(element_type=typ))
+ for typ in all_primitives
+ ]
+
+ basic_map_types = [
+ schema_pb2.FieldType(
+ map_type=schema_pb2.MapType(
+ key_type=key_type, value_type=value_type)) for key_type,
+ value_type in itertools.product(all_primitives, all_primitives)
+ ]
+
+ selected_schemas = [
+ schema_pb2.FieldType(
+ row_type=schema_pb2.RowType(
+ schema=schema_pb2.Schema(
+ id='32497414-85e8-46b7-9c90-9a9cc62fe390',
+ fields=[
+ schema_pb2.Field(name='field%d' % i, type=typ)
+ for i, typ in enumerate(all_primitives)
+ ]))),
+ schema_pb2.FieldType(
+ row_type=schema_pb2.RowType(
+ schema=schema_pb2.Schema(
+ id='dead1637-3204-4bcb-acf8-99675f338600',
+ fields=[
+ schema_pb2.Field(
+ name='id',
+ type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.INT64)),
+ schema_pb2.Field(
+ name='name',
+ type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.STRING)),
+ schema_pb2.Field(
+ name='optional_map',
+ type=schema_pb2.FieldType(
+ nullable=True,
+ map_type=schema_pb2.MapType(
+ key_type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.STRING
+ ),
+ value_type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.DOUBLE
+ )))),
+ schema_pb2.Field(
+ name='optional_array',
+ type=schema_pb2.FieldType(
+ nullable=True,
+ array_type=schema_pb2.ArrayType(
+ element_type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.FLOAT)
+ ))),
+ schema_pb2.Field(
+ name='array_optional',
+ type=schema_pb2.FieldType(
+ array_type=schema_pb2.ArrayType(
+ element_type=schema_pb2.FieldType(
+ nullable=True,
+ atomic_type=schema_pb2.BYTES)
+ ))),
+ ]))),
+ ]
+
+ test_cases = all_primitives + \
+ basic_array_types + \
+ basic_map_types + \
+ selected_schemas
+
+ for test_case in test_cases:
+ self.assertEqual(test_case,
+ typing_to_runner_api(typing_from_runner_api(test_case)))
+
+ def test_unknown_primitive_raise_valueerror(self):
+ self.assertRaises(ValueError, lambda: typing_to_runner_api(np.uint32))
+
+ def test_unknown_atomic_raise_valueerror(self):
+ self.assertRaises(
+ ValueError, lambda: typing_from_runner_api(
+ schema_pb2.FieldType(atomic_type=schema_pb2.UNSPECIFIED))
+ )
+
+ @unittest.skipIf(IS_PYTHON_3, 'str is acceptable in python 3')
+ def test_str_raises_error_py2(self):
+ self.assertRaises(lambda: typing_to_runner_api(str))
+ self.assertRaises(lambda: typing_to_runner_api(
+ NamedTuple('Test', [('int', int), ('str', str)])))
+
+ def test_int_maps_to_int64(self):
+ self.assertEqual(
+ schema_pb2.FieldType(atomic_type=schema_pb2.INT64),
+ typing_to_runner_api(int))
+
+ def test_float_maps_to_float64(self):
+ self.assertEqual(
+ schema_pb2.FieldType(atomic_type=schema_pb2.DOUBLE),
+ typing_to_runner_api(float))
+
+ def test_trivial_example(self):
+ MyCuteClass = NamedTuple('MyCuteClass', [
+ ('name', unicode),
+ ('age', Optional[int]),
+ ('interests', List[unicode]),
+ ('height', float),
+ ('blob', ByteString),
+ ])
+
+ expected = schema_pb2.FieldType(
+ row_type=schema_pb2.RowType(
+ schema=schema_pb2.Schema(fields=[
+ schema_pb2.Field(
+ name='name',
+ type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.STRING),
+ ),
+ schema_pb2.Field(
+ name='age',
+ type=schema_pb2.FieldType(
+ nullable=True,
+ atomic_type=schema_pb2.INT64)),
+ schema_pb2.Field(
+ name='interests',
+ type=schema_pb2.FieldType(
+ array_type=schema_pb2.ArrayType(
+ element_type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.STRING)))),
+ schema_pb2.Field(
+ name='height',
+ type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.DOUBLE)),
+ schema_pb2.Field(
+ name='blob',
+ type=schema_pb2.FieldType(
+ atomic_type=schema_pb2.BYTES)),
+ ])))
+
+ # Only test that the fields are equal. If we attempt to test the entire type
+ # or the entire schema, the generated id will break equality.
+ self.assertEqual(expected.row_type.schema.fields,
+ typing_to_runner_api(MyCuteClass).row_type.schema.fields)
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh
index eab8aad..e3794ba 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -157,6 +157,7 @@
'apache_beam.metrics.metric.MetricResults',
'apache_beam.pipeline.PipelineVisitor',
'apache_beam.pipeline.PTransformOverride',
+ 'apache_beam.portability.api.schema_pb2.Schema',
'apache_beam.pvalue.AsSideInput',
'apache_beam.pvalue.DoOutputsTuple',
'apache_beam.pvalue.PValue',
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 1cbc27f..d8d5c7d 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -116,6 +116,7 @@
'hdfs>=2.1.0,<3.0.0',
'httplib2>=0.8,<=0.12.0',
'mock>=1.0.1,<3.0.0',
+ 'numpy>=1.14.3,<2',
'pymongo>=3.8.0,<4.0.0',
'oauth2client>=2.0.1,<4',
'protobuf>=3.5.0.post1,<4',
@@ -139,7 +140,6 @@
REQUIRED_TEST_PACKAGES = [
'nose>=1.3.7',
'nose_xunitmp>=0.4.1',
- 'numpy>=1.14.3,<2',
'pandas>=0.23.4,<0.25',
'parameterized>=0.6.0,<0.7.0',
'pyhamcrest>=1.9,<2.0',