Merge pull request #15117 from ajamato/bq_java_read_metrics
[BEAM-11994] Update BigQueryStorageStreamSource and BigQueryServicesImpl to capture API_REQUEST_COUNT metrics/errors for storage API reads
diff --git a/.test-infra/jenkins/job_PostCommit_Python.groovy b/.test-infra/jenkins/job_PostCommit_Python.groovy
index f0e4e58..2f3b1d8 100644
--- a/.test-infra/jenkins/job_PostCommit_Python.groovy
+++ b/.test-infra/jenkins/job_PostCommit_Python.groovy
@@ -33,7 +33,7 @@
commonJobProperties.setTopLevelMainJobProperties(delegate, 'master', 120)
publishers {
- archiveJunit('**/nosetests*.xml')
+ archiveJunit('**/pytest*.xml')
}
// Execute shell command to test Python SDK.
diff --git a/LICENSE b/LICENSE
index 44c35bb..3b335e3 100644
--- a/LICENSE
+++ b/LICENSE
@@ -403,261 +403,5 @@
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE
-################################################################################
-CPython LICENSE. Source:
-https://github.com/python/cpython/blob/81574b80e92554adf75c13fa42415beb8be383cb/LICENSE
-
-A. HISTORY OF THE SOFTWARE
-==========================
-
-Python was created in the early 1990s by Guido van Rossum at Stichting
-Mathematisch Centrum (CWI, see http://www.cwi.nl) in the Netherlands
-as a successor of a language called ABC. Guido remains Python's
-principal author, although it includes many contributions from others.
-
-In 1995, Guido continued his work on Python at the Corporation for
-National Research Initiatives (CNRI, see http://www.cnri.reston.va.us)
-in Reston, Virginia where he released several versions of the
-software.
-
-In May 2000, Guido and the Python core development team moved to
-BeOpen.com to form the BeOpen PythonLabs team. In October of the same
-year, the PythonLabs team moved to Digital Creations, which became
-Zope Corporation. In 2001, the Python Software Foundation (PSF, see
-https://www.python.org/psf/) was formed, a non-profit organization
-created specifically to own Python-related Intellectual Property.
-Zope Corporation was a sponsoring member of the PSF.
-
-All Python releases are Open Source (see http://www.opensource.org for
-the Open Source Definition). Historically, most, but not all, Python
-releases have also been GPL-compatible; the table below summarizes
-the various releases.
-
- Release Derived Year Owner GPL-
- from compatible? (1)
-
- 0.9.0 thru 1.2 1991-1995 CWI yes
- 1.3 thru 1.5.2 1.2 1995-1999 CNRI yes
- 1.6 1.5.2 2000 CNRI no
- 2.0 1.6 2000 BeOpen.com no
- 1.6.1 1.6 2001 CNRI yes (2)
- 2.1 2.0+1.6.1 2001 PSF no
- 2.0.1 2.0+1.6.1 2001 PSF yes
- 2.1.1 2.1+2.0.1 2001 PSF yes
- 2.1.2 2.1.1 2002 PSF yes
- 2.1.3 2.1.2 2002 PSF yes
- 2.2 and above 2.1.1 2001-now PSF yes
-
-Footnotes:
-
-(1) GPL-compatible doesn't mean that we're distributing Python under
- the GPL. All Python licenses, unlike the GPL, let you distribute
- a modified version without making your changes open source. The
- GPL-compatible licenses make it possible to combine Python with
- other software that is released under the GPL; the others don't.
-
-(2) According to Richard Stallman, 1.6.1 is not GPL-compatible,
- because its license has a choice of law clause. According to
- CNRI, however, Stallman's lawyer has told CNRI's lawyer that 1.6.1
- is "not incompatible" with the GPL.
-
-Thanks to the many outside volunteers who have worked under Guido's
-direction to make these releases possible.
-
-
-B. TERMS AND CONDITIONS FOR ACCESSING OR OTHERWISE USING PYTHON
-===============================================================
-
-PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
---------------------------------------------
-
-1. This LICENSE AGREEMENT is between the Python Software Foundation
-("PSF"), and the Individual or Organization ("Licensee") accessing and
-otherwise using this software ("Python") in source or binary form and
-its associated documentation.
-
-2. Subject to the terms and conditions of this License Agreement, PSF hereby
-grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
-analyze, test, perform and/or display publicly, prepare derivative works,
-distribute, and otherwise use Python alone or in any derivative version,
-provided, however, that PSF's License Agreement and PSF's notice of copyright,
-i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
-2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018 Python Software Foundation; All
-Rights Reserved" are retained in Python alone or in any derivative version
-prepared by Licensee.
-
-3. In the event Licensee prepares a derivative work that is based on
-or incorporates Python or any part thereof, and wants to make
-the derivative work available to others as provided herein, then
-Licensee hereby agrees to include in any such work a brief summary of
-the changes made to Python.
-
-4. PSF is making Python available to Licensee on an "AS IS"
-basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
-IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
-DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
-FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
-INFRINGE ANY THIRD PARTY RIGHTS.
-
-5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
-FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
-A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
-OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
-
-6. This License Agreement will automatically terminate upon a material
-breach of its terms and conditions.
-
-7. Nothing in this License Agreement shall be deemed to create any
-relationship of agency, partnership, or joint venture between PSF and
-Licensee. This License Agreement does not grant permission to use PSF
-trademarks or trade name in a trademark sense to endorse or promote
-products or services of Licensee, or any third party.
-
-8. By copying, installing or otherwise using Python, Licensee
-agrees to be bound by the terms and conditions of this License
-Agreement.
-
-
-BEOPEN.COM LICENSE AGREEMENT FOR PYTHON 2.0
--------------------------------------------
-
-BEOPEN PYTHON OPEN SOURCE LICENSE AGREEMENT VERSION 1
-
-1. This LICENSE AGREEMENT is between BeOpen.com ("BeOpen"), having an
-office at 160 Saratoga Avenue, Santa Clara, CA 95051, and the
-Individual or Organization ("Licensee") accessing and otherwise using
-this software in source or binary form and its associated
-documentation ("the Software").
-
-2. Subject to the terms and conditions of this BeOpen Python License
-Agreement, BeOpen hereby grants Licensee a non-exclusive,
-royalty-free, world-wide license to reproduce, analyze, test, perform
-and/or display publicly, prepare derivative works, distribute, and
-otherwise use the Software alone or in any derivative version,
-provided, however, that the BeOpen Python License is retained in the
-Software, alone or in any derivative version prepared by Licensee.
-
-3. BeOpen is making the Software available to Licensee on an "AS IS"
-basis. BEOPEN MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
-IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, BEOPEN MAKES NO AND
-DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
-FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF THE SOFTWARE WILL NOT
-INFRINGE ANY THIRD PARTY RIGHTS.
-
-4. BEOPEN SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF THE
-SOFTWARE FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS
-AS A RESULT OF USING, MODIFYING OR DISTRIBUTING THE SOFTWARE, OR ANY
-DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
-
-5. This License Agreement will automatically terminate upon a material
-breach of its terms and conditions.
-
-6. This License Agreement shall be governed by and interpreted in all
-respects by the law of the State of California, excluding conflict of
-law provisions. Nothing in this License Agreement shall be deemed to
-create any relationship of agency, partnership, or joint venture
-between BeOpen and Licensee. This License Agreement does not grant
-permission to use BeOpen trademarks or trade names in a trademark
-sense to endorse or promote products or services of Licensee, or any
-third party. As an exception, the "BeOpen Python" logos available at
-http://www.pythonlabs.com/logos.html may be used according to the
-permissions granted on that web page.
-
-7. By copying, installing or otherwise using the software, Licensee
-agrees to be bound by the terms and conditions of this License
-Agreement.
-
-
-CNRI LICENSE AGREEMENT FOR PYTHON 1.6.1
----------------------------------------
-
-1. This LICENSE AGREEMENT is between the Corporation for National
-Research Initiatives, having an office at 1895 Preston White Drive,
-Reston, VA 20191 ("CNRI"), and the Individual or Organization
-("Licensee") accessing and otherwise using Python 1.6.1 software in
-source or binary form and its associated documentation.
-
-2. Subject to the terms and conditions of this License Agreement, CNRI
-hereby grants Licensee a nonexclusive, royalty-free, world-wide
-license to reproduce, analyze, test, perform and/or display publicly,
-prepare derivative works, distribute, and otherwise use Python 1.6.1
-alone or in any derivative version, provided, however, that CNRI's
-License Agreement and CNRI's notice of copyright, i.e., "Copyright (c)
-1995-2001 Corporation for National Research Initiatives; All Rights
-Reserved" are retained in Python 1.6.1 alone or in any derivative
-version prepared by Licensee. Alternately, in lieu of CNRI's License
-Agreement, Licensee may substitute the following text (omitting the
-quotes): "Python 1.6.1 is made available subject to the terms and
-conditions in CNRI's License Agreement. This Agreement together with
-Python 1.6.1 may be located on the Internet using the following
-unique, persistent identifier (known as a handle): 1895.22/1013. This
-Agreement may also be obtained from a proxy server on the Internet
-using the following URL: http://hdl.handle.net/1895.22/1013".
-
-3. In the event Licensee prepares a derivative work that is based on
-or incorporates Python 1.6.1 or any part thereof, and wants to make
-the derivative work available to others as provided herein, then
-Licensee hereby agrees to include in any such work a brief summary of
-the changes made to Python 1.6.1.
-
-4. CNRI is making Python 1.6.1 available to Licensee on an "AS IS"
-basis. CNRI MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
-IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, CNRI MAKES NO AND
-DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
-FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON 1.6.1 WILL NOT
-INFRINGE ANY THIRD PARTY RIGHTS.
-
-5. CNRI SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
-1.6.1 FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
-A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 1.6.1,
-OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
-
-6. This License Agreement will automatically terminate upon a material
-breach of its terms and conditions.
-
-7. This License Agreement shall be governed by the federal
-intellectual property law of the United States, including without
-limitation the federal copyright law, and, to the extent such
-U.S. federal law does not apply, by the law of the Commonwealth of
-Virginia, excluding Virginia's conflict of law provisions.
-Notwithstanding the foregoing, with regard to derivative works based
-on Python 1.6.1 that incorporate non-separable material that was
-previously distributed under the GNU General Public License (GPL), the
-law of the Commonwealth of Virginia shall govern this License
-Agreement only as to issues arising under or with respect to
-Paragraphs 4, 5, and 7 of this License Agreement. Nothing in this
-License Agreement shall be deemed to create any relationship of
-agency, partnership, or joint venture between CNRI and Licensee. This
-License Agreement does not grant permission to use CNRI trademarks or
-trade name in a trademark sense to endorse or promote products or
-services of Licensee, or any third party.
-
-8. By clicking on the "ACCEPT" button where indicated, or by copying,
-installing or otherwise using Python 1.6.1, Licensee agrees to be
-bound by the terms and conditions of this License Agreement.
-
- ACCEPT
-
-
-CWI LICENSE AGREEMENT FOR PYTHON 0.9.0 THROUGH 1.2
---------------------------------------------------
-
-Copyright (c) 1991 - 1995, Stichting Mathematisch Centrum Amsterdam,
-The Netherlands. All rights reserved.
-
-Permission to use, copy, modify, and distribute this software and its
-documentation for any purpose and without fee is hereby granted,
-provided that the above copyright notice appear in all copies and that
-both that copyright notice and this permission notice appear in
-supporting documentation, and that the name of Stichting Mathematisch
-Centrum or CWI not be used in advertising or publicity pertaining to
-distribution of the software without specific, written prior
-permission.
-
-STICHTING MATHEMATISCH CENTRUM DISCLAIMS ALL WARRANTIES WITH REGARD TO
-THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
-FITNESS, IN NO EVENT SHALL STICHTING MATHEMATISCH CENTRUM BE LIABLE
-FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
-WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
-ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
-OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+See the adjacent LICENSE.python file, if present, for additional licenses that
+apply to parts of Apache Beam Python.
\ No newline at end of file
diff --git a/LICENSE.python b/LICENSE.python
new file mode 100644
index 0000000..f2f028b
--- /dev/null
+++ b/LICENSE.python
@@ -0,0 +1,258 @@
+
+################################################################################
+CPython LICENSE. Source:
+https://github.com/python/cpython/blob/81574b80e92554adf75c13fa42415beb8be383cb/LICENSE
+
+A. HISTORY OF THE SOFTWARE
+==========================
+
+Python was created in the early 1990s by Guido van Rossum at Stichting
+Mathematisch Centrum (CWI, see http://www.cwi.nl) in the Netherlands
+as a successor of a language called ABC. Guido remains Python's
+principal author, although it includes many contributions from others.
+
+In 1995, Guido continued his work on Python at the Corporation for
+National Research Initiatives (CNRI, see http://www.cnri.reston.va.us)
+in Reston, Virginia where he released several versions of the
+software.
+
+In May 2000, Guido and the Python core development team moved to
+BeOpen.com to form the BeOpen PythonLabs team. In October of the same
+year, the PythonLabs team moved to Digital Creations, which became
+Zope Corporation. In 2001, the Python Software Foundation (PSF, see
+https://www.python.org/psf/) was formed, a non-profit organization
+created specifically to own Python-related Intellectual Property.
+Zope Corporation was a sponsoring member of the PSF.
+
+All Python releases are Open Source (see http://www.opensource.org for
+the Open Source Definition). Historically, most, but not all, Python
+releases have also been GPL-compatible; the table below summarizes
+the various releases.
+
+ Release Derived Year Owner GPL-
+ from compatible? (1)
+
+ 0.9.0 thru 1.2 1991-1995 CWI yes
+ 1.3 thru 1.5.2 1.2 1995-1999 CNRI yes
+ 1.6 1.5.2 2000 CNRI no
+ 2.0 1.6 2000 BeOpen.com no
+ 1.6.1 1.6 2001 CNRI yes (2)
+ 2.1 2.0+1.6.1 2001 PSF no
+ 2.0.1 2.0+1.6.1 2001 PSF yes
+ 2.1.1 2.1+2.0.1 2001 PSF yes
+ 2.1.2 2.1.1 2002 PSF yes
+ 2.1.3 2.1.2 2002 PSF yes
+ 2.2 and above 2.1.1 2001-now PSF yes
+
+Footnotes:
+
+(1) GPL-compatible doesn't mean that we're distributing Python under
+ the GPL. All Python licenses, unlike the GPL, let you distribute
+ a modified version without making your changes open source. The
+ GPL-compatible licenses make it possible to combine Python with
+ other software that is released under the GPL; the others don't.
+
+(2) According to Richard Stallman, 1.6.1 is not GPL-compatible,
+ because its license has a choice of law clause. According to
+ CNRI, however, Stallman's lawyer has told CNRI's lawyer that 1.6.1
+ is "not incompatible" with the GPL.
+
+Thanks to the many outside volunteers who have worked under Guido's
+direction to make these releases possible.
+
+B. TERMS AND CONDITIONS FOR ACCESSING OR OTHERWISE USING PYTHON
+===============================================================
+
+PYTHON SOFTWARE FOUNDATION LICENSE VERSION 2
+--------------------------------------------
+
+1. This LICENSE AGREEMENT is between the Python Software Foundation
+("PSF"), and the Individual or Organization ("Licensee") accessing and
+otherwise using this software ("Python") in source or binary form and
+its associated documentation.
+
+2. Subject to the terms and conditions of this License Agreement, PSF hereby
+grants Licensee a nonexclusive, royalty-free, world-wide license to reproduce,
+analyze, test, perform and/or display publicly, prepare derivative works,
+distribute, and otherwise use Python alone or in any derivative version,
+provided, however, that PSF's License Agreement and PSF's notice of copyright,
+i.e., "Copyright (c) 2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+2011, 2012, 2013, 2014, 2015, 2016, 2017, 2018 Python Software Foundation; All
+Rights Reserved" are retained in Python alone or in any derivative version
+prepared by Licensee.
+
+3. In the event Licensee prepares a derivative work that is based on
+or incorporates Python or any part thereof, and wants to make
+the derivative work available to others as provided herein, then
+Licensee hereby agrees to include in any such work a brief summary of
+the changes made to Python.
+
+4. PSF is making Python available to Licensee on an "AS IS"
+basis. PSF MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
+FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
+A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON,
+OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+6. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+7. Nothing in this License Agreement shall be deemed to create any
+relationship of agency, partnership, or joint venture between PSF and
+Licensee. This License Agreement does not grant permission to use PSF
+trademarks or trade name in a trademark sense to endorse or promote
+products or services of Licensee, or any third party.
+
+8. By copying, installing or otherwise using Python, Licensee
+agrees to be bound by the terms and conditions of this License
+Agreement.
+
+BEOPEN.COM LICENSE AGREEMENT FOR PYTHON 2.0
+-------------------------------------------
+
+BEOPEN PYTHON OPEN SOURCE LICENSE AGREEMENT VERSION 1
+
+1. This LICENSE AGREEMENT is between BeOpen.com ("BeOpen"), having an
+office at 160 Saratoga Avenue, Santa Clara, CA 95051, and the
+Individual or Organization ("Licensee") accessing and otherwise using
+this software in source or binary form and its associated
+documentation ("the Software").
+
+2. Subject to the terms and conditions of this BeOpen Python License
+Agreement, BeOpen hereby grants Licensee a non-exclusive,
+royalty-free, world-wide license to reproduce, analyze, test, perform
+and/or display publicly, prepare derivative works, distribute, and
+otherwise use the Software alone or in any derivative version,
+provided, however, that the BeOpen Python License is retained in the
+Software, alone or in any derivative version prepared by Licensee.
+
+3. BeOpen is making the Software available to Licensee on an "AS IS"
+basis. BEOPEN MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, BEOPEN MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF THE SOFTWARE WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+4. BEOPEN SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF THE
+SOFTWARE FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS
+AS A RESULT OF USING, MODIFYING OR DISTRIBUTING THE SOFTWARE, OR ANY
+DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+5. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+6. This License Agreement shall be governed by and interpreted in all
+respects by the law of the State of California, excluding conflict of
+law provisions. Nothing in this License Agreement shall be deemed to
+create any relationship of agency, partnership, or joint venture
+between BeOpen and Licensee. This License Agreement does not grant
+permission to use BeOpen trademarks or trade names in a trademark
+sense to endorse or promote products or services of Licensee, or any
+third party. As an exception, the "BeOpen Python" logos available at
+http://www.pythonlabs.com/logos.html may be used according to the
+permissions granted on that web page.
+
+7. By copying, installing or otherwise using the software, Licensee
+agrees to be bound by the terms and conditions of this License
+Agreement.
+
+
+CNRI LICENSE AGREEMENT FOR PYTHON 1.6.1
+---------------------------------------
+
+1. This LICENSE AGREEMENT is between the Corporation for National
+Research Initiatives, having an office at 1895 Preston White Drive,
+Reston, VA 20191 ("CNRI"), and the Individual or Organization
+("Licensee") accessing and otherwise using Python 1.6.1 software in
+source or binary form and its associated documentation.
+
+2. Subject to the terms and conditions of this License Agreement, CNRI
+hereby grants Licensee a nonexclusive, royalty-free, world-wide
+license to reproduce, analyze, test, perform and/or display publicly,
+prepare derivative works, distribute, and otherwise use Python 1.6.1
+alone or in any derivative version, provided, however, that CNRI's
+License Agreement and CNRI's notice of copyright, i.e., "Copyright (c)
+1995-2001 Corporation for National Research Initiatives; All Rights
+Reserved" are retained in Python 1.6.1 alone or in any derivative
+version prepared by Licensee. Alternately, in lieu of CNRI's License
+Agreement, Licensee may substitute the following text (omitting the
+quotes): "Python 1.6.1 is made available subject to the terms and
+conditions in CNRI's License Agreement. This Agreement together with
+Python 1.6.1 may be located on the Internet using the following
+unique, persistent identifier (known as a handle): 1895.22/1013. This
+Agreement may also be obtained from a proxy server on the Internet
+using the following URL: http://hdl.handle.net/1895.22/1013".
+
+3. In the event Licensee prepares a derivative work that is based on
+or incorporates Python 1.6.1 or any part thereof, and wants to make
+the derivative work available to others as provided herein, then
+Licensee hereby agrees to include in any such work a brief summary of
+the changes made to Python 1.6.1.
+
+4. CNRI is making Python 1.6.1 available to Licensee on an "AS IS"
+basis. CNRI MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR
+IMPLIED. BY WAY OF EXAMPLE, BUT NOT LIMITATION, CNRI MAKES NO AND
+DISCLAIMS ANY REPRESENTATION OR WARRANTY OF MERCHANTABILITY OR FITNESS
+FOR ANY PARTICULAR PURPOSE OR THAT THE USE OF PYTHON 1.6.1 WILL NOT
+INFRINGE ANY THIRD PARTY RIGHTS.
+
+5. CNRI SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON
+1.6.1 FOR ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS
+A RESULT OF MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 1.6.1,
+OR ANY DERIVATIVE THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+
+6. This License Agreement will automatically terminate upon a material
+breach of its terms and conditions.
+
+7. This License Agreement shall be governed by the federal
+intellectual property law of the United States, including without
+limitation the federal copyright law, and, to the extent such
+U.S. federal law does not apply, by the law of the Commonwealth of
+Virginia, excluding Virginia's conflict of law provisions.
+Notwithstanding the foregoing, with regard to derivative works based
+on Python 1.6.1 that incorporate non-separable material that was
+previously distributed under the GNU General Public License (GPL), the
+law of the Commonwealth of Virginia shall govern this License
+Agreement only as to issues arising under or with respect to
+Paragraphs 4, 5, and 7 of this License Agreement. Nothing in this
+License Agreement shall be deemed to create any relationship of
+agency, partnership, or joint venture between CNRI and Licensee. This
+License Agreement does not grant permission to use CNRI trademarks or
+trade name in a trademark sense to endorse or promote products or
+services of Licensee, or any third party.
+
+8. By clicking on the "ACCEPT" button where indicated, or by copying,
+installing or otherwise using Python 1.6.1, Licensee agrees to be
+bound by the terms and conditions of this License Agreement.
+
+ ACCEPT
+
+
+CWI LICENSE AGREEMENT FOR PYTHON 0.9.0 THROUGH 1.2
+--------------------------------------------------
+
+Copyright (c) 1991 - 1995, Stichting Mathematisch Centrum Amsterdam,
+The Netherlands. All rights reserved.
+
+Permission to use, copy, modify, and distribute this software and its
+documentation for any purpose and without fee is hereby granted,
+provided that the above copyright notice appear in all copies and that
+both that copyright notice and this permission notice appear in
+supporting documentation, and that the name of Stichting Mathematisch
+Centrum or CWI not be used in advertising or publicity pertaining to
+distribution of the software without specific, written prior
+permission.
+
+STICHTING MATHEMATISCH CENTRUM DISCLAIMS ALL WARRANTIES WITH REGARD TO
+THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
+FITNESS, IN NO EVENT SHALL STICHTING MATHEMATISCH CENTRUM BE LIABLE
+FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
diff --git a/build.gradle.kts b/build.gradle.kts
index 849f1f6..2d37828 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -91,6 +91,9 @@
"ownership/**/*",
"**/OWNERS",
+ // Ignore CPython LICENSE file
+ "LICENSE.python",
+
// Json doesn't support comments.
"**/*.json",
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index a078fe9..2404691 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -130,6 +130,17 @@
boolean validateShadowJar = true
/**
+ * Controls whether the 'jmh' source set is enabled for JMH benchmarks.
+ *
+ * Add additional dependencies to the jmhCompile and jmhRuntime dependency
+ * sets.
+ *
+ * Note that the JMH annotation processor is enabled by default and that
+ * a 'jmh' task is created which executes JMH.
+ */
+ boolean enableJmh = false
+
+ /**
* The set of excludes that should be used during validation of the shadow jar. Projects should override
* the default with the most specific set of excludes that is valid for the contents of its shaded jar.
*
@@ -462,6 +473,7 @@
def spotbugs_version = "4.0.6"
def testcontainers_version = "1.15.1"
def arrow_version = "4.0.0"
+ def jmh_version = "1.32"
// A map of maps containing common libraries used per language. To use:
// dependencies {
@@ -628,7 +640,7 @@
proto_google_cloud_firestore_v1 : "com.google.api.grpc:proto-google-cloud-firestore-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1:$google_cloud_pubsublite_version",
- proto_google_cloud_spanner_v1: "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
+ proto_google_cloud_spanner_v1 : "com.google.api.grpc:proto-google-cloud-spanner-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_spanner_admin_database_v1: "com.google.api.grpc:proto-google-cloud-spanner-admin-database-v1", // google_cloud_platform_libraries_bom sets version
proto_google_common_protos : "com.google.api.grpc:proto-google-common-protos", // google_cloud_platform_libraries_bom sets version
slf4j_api : "org.slf4j:slf4j-api:$slf4j_version",
@@ -848,6 +860,7 @@
List<String> skipDefRegexes = []
skipDefRegexes << "AutoValue_.*"
skipDefRegexes << "AutoOneOf_.*"
+ skipDefRegexes << ".*\\.jmh_generated\\..*"
skipDefRegexes += configuration.generatedClassPatterns
skipDefRegexes += configuration.classesTriggerCheckerBugs.keySet()
String skipDefCombinedRegex = skipDefRegexes.collect({ regex -> "(${regex})"}).join("|")
@@ -1240,6 +1253,45 @@
project.artifacts.testRuntime project.testJar
}
+ if (configuration.enableJmh) {
+ // We specifically use a separate source set for JMH to ensure that it does not
+ // become a required artifact
+ project.sourceSets {
+ jmh {
+ java {
+ srcDir "src/jmh/java"
+ }
+ resources {
+ srcDir "src/jmh/resources"
+ }
+ }
+ }
+
+ project.dependencies {
+ jmhAnnotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$jmh_version"
+ jmhCompile "org.openjdk.jmh:jmh-core:$jmh_version"
+ }
+
+ project.task("jmh", type: JavaExec, dependsOn: project.jmhClasses, {
+ main = "org.openjdk.jmh.Main"
+ classpath = project.sourceSets.jmh.compileClasspath + project.sourceSets.jmh.runtimeClasspath
+ // For a list of arguments, see
+ // https://github.com/guozheng/jmh-tutorial/blob/master/README.md
+ //
+ // Filter for a specific benchmark to run (uncomment below)
+ // Note that multiple regex are supported each as a separate argument.
+ // args 'BeamFnLoggingClientBenchmark.testLoggingWithAllOptionalParameters'
+ // args 'additional regexp...'
+ //
+ // Enumerate available benchmarks and exit (uncomment below)
+ // args '-l'
+ //
+ // Enable connecting a debugger by disabling forking (uncomment below)
+ // Useful for debugging via an IDE such as Intellij
+ // args '-f0'
+ })
+ }
+
project.ext.includeInJavaBom = configuration.publish
project.ext.exportJavadoc = configuration.exportJavadoc
@@ -1738,6 +1790,7 @@
project.docker { noCache true }
project.tasks.create(name: "copyLicenses", type: Copy) {
from "${project.rootProject.projectDir}/LICENSE"
+ from "${project.rootProject.projectDir}/LICENSE.python"
from "${project.rootProject.projectDir}/NOTICE"
into "build/target"
}
diff --git a/release/go-licenses/Dockerfile b/release/go-licenses/Dockerfile
index 43e7870..6004f45 100644
--- a/release/go-licenses/Dockerfile
+++ b/release/go-licenses/Dockerfile
@@ -16,7 +16,7 @@
# limitations under the License.
###############################################################################
-FROM golang:1.15.0-buster
+FROM golang:1.16.0-buster
RUN go get github.com/google/go-licenses
COPY get-licenses.sh /opt/apache/beam/
ARG sdk_location
diff --git a/release/go-licenses/get-licenses.sh b/release/go-licenses/get-licenses.sh
index 727fb9b..be1e01d 100755
--- a/release/go-licenses/get-licenses.sh
+++ b/release/go-licenses/get-licenses.sh
@@ -19,6 +19,7 @@
set -ex
rm -rf /output/*
+export GO111MODULE=off
go get $sdk_location
go-licenses save $sdk_location --save_path=/output/licenses
diff --git a/runners/google-cloud-dataflow-java/build.gradle b/runners/google-cloud-dataflow-java/build.gradle
index e01e441..4d79285 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -167,6 +167,7 @@
'org.apache.beam.sdk.testing.UsesParDoLifecycle',
'org.apache.beam.sdk.testing.UsesMetricsPusher',
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
+ 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering',
]
def commonRunnerV2ExcludeCategories = [
@@ -178,6 +179,7 @@
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesTestStreamWithProcessingTime',
'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
+ 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering',
]
// For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
@@ -277,7 +279,10 @@
commandLine "docker", "rmi", "--force", "${dockerImageName}"
}
exec {
- commandLine "gcloud", "--quiet", "container", "images", "delete", "--force-delete-tags", "${dockerImageName}"
+ commandLine "gcloud", "--quiet", "container", "images", "untag", "${dockerImageName}"
+ }
+ exec {
+ commandLine "./scripts/cleanup_untagged_gcr_images.sh", "${dockerImageContainer}"
}
}
}
@@ -319,7 +324,6 @@
'org.apache.beam.sdk.testing.UsesMapState',
'org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput',
'org.apache.beam.sdk.testing.UsesSetState',
- 'org.apache.beam.sdk.testing.UsesStrictTimerOrdering',
],
))
}
diff --git a/runners/google-cloud-dataflow-java/scripts/cleanup_untagged_gcr_images.sh b/runners/google-cloud-dataflow-java/scripts/cleanup_untagged_gcr_images.sh
new file mode 100755
index 0000000..5bf8197
--- /dev/null
+++ b/runners/google-cloud-dataflow-java/scripts/cleanup_untagged_gcr_images.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e
+
+IMAGE_NAME=$1
+
+# Find all untagged images
+DIGESTS=$(gcloud container images list-tags "${IMAGE_NAME}" --filter='-tags:*' --format="get(digest)")
+
+# Delete image
+echo "${DIGESTS}" | while read -r digest; do
+ if [[ ! -z "${digest}" ]]; then
+ img="${IMAGE_NAME}@${digest}"
+ echo "Removing untagged image ${img}"
+ gcloud container images delete --quiet "${img}"
+ fi
+done
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 81ae092..3c06ee9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -1525,8 +1525,8 @@
try {
blockedStartMs.set(Instant.now().getMillis());
- current = queue.take();
- if (current != POISON_PILL) {
+ current = queue.poll(180, TimeUnit.SECONDS);
+ if (current != null && current != POISON_PILL) {
return true;
}
if (cancelled.get()) {
@@ -1535,7 +1535,8 @@
if (complete.get()) {
return false;
}
- throw new IllegalStateException("Got poison pill but stream is not done.");
+ throw new IllegalStateException(
+ "Got poison pill or timeout but stream is not done.");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new CancellationException();
diff --git a/sdks/go/BUILD.md b/sdks/go/BUILD.md
index 8d99b93..d4c02d7 100644
--- a/sdks/go/BUILD.md
+++ b/sdks/go/BUILD.md
@@ -64,3 +64,13 @@
If you make changes to .proto files, you will need to rebuild the generated code.
Consult `pkg/beam/model/PROTOBUF.md`.
+
+If you make changes to .tmpl files, then add the specialize tool to your path.
+You can install specialize using:
+```
+go get github.com/apache/beam/sdks/go/cmd/specialize
+```
+Add it to your path:
+```
+export PATH=$PATH:$GOROOT/bin:$GOPATH/bin
+```
\ No newline at end of file
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go b/sdks/go/pkg/beam/core/runtime/exec/combine.go
index c7576fe..299edb2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go
@@ -286,8 +286,17 @@
func (n *Combine) fail(err error) error {
n.status = Broken
- n.err.TrySetError(err)
- return err
+ if err2, ok := err.(*doFnError); ok {
+ return err2
+ }
+ combineError := &doFnError{
+ doFn: n.Fn.Name(),
+ err: err,
+ uid: n.UID,
+ pid: n.PID,
+ }
+ n.err.TrySetError(combineError)
+ return combineError
}
func (n *Combine) String() string {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index 099763f..f7a9f32 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -332,8 +332,18 @@
func (n *ParDo) fail(err error) error {
n.status = Broken
- n.err.TrySetError(err)
- return err
+ if err2, ok := err.(*doFnError); ok {
+ return err2
+ }
+
+ parDoError := &doFnError{
+ doFn: n.Fn.Name(),
+ err: err,
+ uid: n.UID,
+ pid: n.PID,
+ }
+ n.err.TrySetError(parDoError)
+ return parDoError
}
func (n *ParDo) String() string {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/util.go b/sdks/go/pkg/beam/core/runtime/exec/util.go
index dc0608b..273c5c8 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/util.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/util.go
@@ -17,6 +17,7 @@
import (
"context"
+ "fmt"
"runtime/debug"
"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
@@ -33,13 +34,29 @@
return UnitID(g.last)
}
+type doFnError struct {
+ doFn string
+ err error
+ uid UnitID
+ pid string
+}
+
+func (e *doFnError) Error() string {
+ return fmt.Sprintf("DoFn[UID:%v, PID:%v, Name: %v] failed:\n%v", e.uid, e.pid, e.doFn, e.err)
+}
+
// callNoPanic calls the given function and catches any panic.
func callNoPanic(ctx context.Context, fn func(context.Context) error) (err error) {
defer func() {
if r := recover(); r != nil {
- // Top level error is the panic itself, but also include the stack trace as the original error.
- // Higher levels can then add appropriate context without getting pushed down by the stack trace.
- err = errors.SetTopLevelMsgf(errors.Errorf("panic: %v %s", r, debug.Stack()), "panic: %v", r)
+ // Check if the panic value is from a failed DoFn, and return it without a panic trace.
+ if e, ok := r.(*doFnError); ok {
+ err = e
+ } else {
+ // Top level error is the panic itself, but also include the stack trace as the original error.
+ // Higher levels can then add appropriate context without getting pushed down by the stack trace.
+ err = errors.SetTopLevelMsgf(errors.Errorf("panic: %v %s", r, debug.Stack()), "panic: %v", r)
+ }
}
}()
return fn(ctx)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/util_test.go b/sdks/go/pkg/beam/core/runtime/exec/util_test.go
new file mode 100644
index 0000000..0009c40
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/util_test.go
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+ "context"
+ "strings"
+ "testing"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+ "github.com/apache/beam/sdks/go/pkg/beam/util/errorx"
+)
+
+// testSimpleError tests for a simple case that doesn't panic
+func TestCallNoPanic_simple(t *testing.T) {
+ ctx := context.Background()
+ want := errors.New("Simple error.")
+ got := callNoPanic(ctx, func(c context.Context) error { return errors.New("Simple error.") })
+
+ if got.Error() != want.Error() {
+ t.Errorf("callNoPanic(<func that returns error>) = %v, want %v", got, want)
+ }
+}
+
+// testPanicError tests for the case in which a normal error is passed to panic, resulting in panic trace.
+func TestCallNoPanic_panic(t *testing.T) {
+ ctx := context.Background()
+ got := callNoPanic(ctx, func(c context.Context) error { panic("Panic error") })
+ if !strings.Contains(got.Error(), "panic:") {
+ t.Errorf("callNoPanic(<func that panics with a string>) didn't panic, got = %v", got)
+ }
+}
+
+// testWrapPanicError tests for the case in which error is passed to panic from
+// DoFn, resulting in formatted error message for DoFn.
+func TestCallNoPanic_wrappedPanic(t *testing.T) {
+ ctx := context.Background()
+ errs := errors.New("SumFn error")
+ parDoError := &doFnError{
+ doFn: "sumFn",
+ err: errs,
+ uid: 1,
+ pid: "Plan ID",
+ }
+ want := "DoFn[<1>;<Plan ID>]<sumFn> returned error:<SumFn error>"
+ var err errorx.GuardedError
+ err.TrySetError(parDoError)
+
+ got := callNoPanic(ctx, func(c context.Context) error { panic(parDoError) })
+
+ if strings.Contains(got.Error(), "panic:") {
+ t.Errorf("callNoPanic(<func that panics with a wrapped known error>) did not filter panic, want %v, got %v", want, got)
+ }
+}
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
index 2e3ea3f..d8ed074 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/schema/schema.go
@@ -106,7 +106,14 @@
}
// reconcileRegistrations actually finishes the registration process.
-func (r *Registry) reconcileRegistrations() error {
+func (r *Registry) reconcileRegistrations() (deferedErr error) {
+ var ut reflect.Type
+ defer func() {
+ if r := recover(); r != nil {
+ deferedErr = errors.Errorf("panicked: %v", r)
+ deferedErr = errors.WithContextf(deferedErr, "reconciling schema registration for type %v", ut)
+ }
+ }()
for _, ut := range r.toReconcile {
check := func(ut reflect.Type) bool {
return coder.LookupCustomCoder(ut) != nil
diff --git a/sdks/go/pkg/beam/core/util/reflectx/call.go b/sdks/go/pkg/beam/core/util/reflectx/call.go
index 44f0602..dde9a8a 100644
--- a/sdks/go/pkg/beam/core/util/reflectx/call.go
+++ b/sdks/go/pkg/beam/core/util/reflectx/call.go
@@ -19,8 +19,9 @@
"reflect"
"sync"
- "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
"runtime/debug"
+
+ "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
)
//go:generate specialize --input=calls.tmpl
diff --git a/sdks/go/pkg/beam/core/util/reflectx/calls.go b/sdks/go/pkg/beam/core/util/reflectx/calls.go
index a1bfe6b..3297a49 100644
--- a/sdks/go/pkg/beam/core/util/reflectx/calls.go
+++ b/sdks/go/pkg/beam/core/util/reflectx/calls.go
@@ -17,7 +17,10 @@
package reflectx
-import "reflect"
+import (
+ "fmt"
+ "reflect"
+)
// Generated arity-specialized Func implementations to avoid runtime temporary
// slices. Code that knows the arity can potentially avoid that overhead in
@@ -57,7 +60,7 @@
func ToFunc0x0(c Func) Func0x0 {
if c.Type().NumIn() != 0 || c.Type().NumOut() != 0 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 0 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func0x0); ok {
return sc
@@ -98,7 +101,7 @@
func ToFunc0x1(c Func) Func0x1 {
if c.Type().NumIn() != 0 || c.Type().NumOut() != 1 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 0 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func0x1); ok {
return sc
@@ -139,7 +142,7 @@
func ToFunc0x2(c Func) Func0x2 {
if c.Type().NumIn() != 0 || c.Type().NumOut() != 2 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 0 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func0x2); ok {
return sc
@@ -180,7 +183,7 @@
func ToFunc0x3(c Func) Func0x3 {
if c.Type().NumIn() != 0 || c.Type().NumOut() != 3 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 0 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func0x3); ok {
return sc
@@ -221,7 +224,7 @@
func ToFunc1x0(c Func) Func1x0 {
if c.Type().NumIn() != 1 || c.Type().NumOut() != 0 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 1 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func1x0); ok {
return sc
@@ -262,7 +265,7 @@
func ToFunc1x1(c Func) Func1x1 {
if c.Type().NumIn() != 1 || c.Type().NumOut() != 1 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 1 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func1x1); ok {
return sc
@@ -303,7 +306,7 @@
func ToFunc1x2(c Func) Func1x2 {
if c.Type().NumIn() != 1 || c.Type().NumOut() != 2 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 1 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func1x2); ok {
return sc
@@ -344,7 +347,7 @@
func ToFunc1x3(c Func) Func1x3 {
if c.Type().NumIn() != 1 || c.Type().NumOut() != 3 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 1 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func1x3); ok {
return sc
@@ -385,7 +388,7 @@
func ToFunc2x0(c Func) Func2x0 {
if c.Type().NumIn() != 2 || c.Type().NumOut() != 0 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 2 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func2x0); ok {
return sc
@@ -426,7 +429,7 @@
func ToFunc2x1(c Func) Func2x1 {
if c.Type().NumIn() != 2 || c.Type().NumOut() != 1 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 2 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func2x1); ok {
return sc
@@ -467,7 +470,7 @@
func ToFunc2x2(c Func) Func2x2 {
if c.Type().NumIn() != 2 || c.Type().NumOut() != 2 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 2 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func2x2); ok {
return sc
@@ -508,7 +511,7 @@
func ToFunc2x3(c Func) Func2x3 {
if c.Type().NumIn() != 2 || c.Type().NumOut() != 3 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 2 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func2x3); ok {
return sc
@@ -549,7 +552,7 @@
func ToFunc3x0(c Func) Func3x0 {
if c.Type().NumIn() != 3 || c.Type().NumOut() != 0 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 3 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func3x0); ok {
return sc
@@ -590,7 +593,7 @@
func ToFunc3x1(c Func) Func3x1 {
if c.Type().NumIn() != 3 || c.Type().NumOut() != 1 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 3 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func3x1); ok {
return sc
@@ -631,7 +634,7 @@
func ToFunc3x2(c Func) Func3x2 {
if c.Type().NumIn() != 3 || c.Type().NumOut() != 2 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 3 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func3x2); ok {
return sc
@@ -672,7 +675,7 @@
func ToFunc3x3(c Func) Func3x3 {
if c.Type().NumIn() != 3 || c.Type().NumOut() != 3 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 3 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func3x3); ok {
return sc
@@ -713,7 +716,7 @@
func ToFunc4x0(c Func) Func4x0 {
if c.Type().NumIn() != 4 || c.Type().NumOut() != 0 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 4 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func4x0); ok {
return sc
@@ -754,7 +757,7 @@
func ToFunc4x1(c Func) Func4x1 {
if c.Type().NumIn() != 4 || c.Type().NumOut() != 1 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 4 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func4x1); ok {
return sc
@@ -795,7 +798,7 @@
func ToFunc4x2(c Func) Func4x2 {
if c.Type().NumIn() != 4 || c.Type().NumOut() != 2 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 4 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func4x2); ok {
return sc
@@ -836,7 +839,7 @@
func ToFunc4x3(c Func) Func4x3 {
if c.Type().NumIn() != 4 || c.Type().NumOut() != 3 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 4 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func4x3); ok {
return sc
@@ -877,7 +880,7 @@
func ToFunc5x0(c Func) Func5x0 {
if c.Type().NumIn() != 5 || c.Type().NumOut() != 0 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 5 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func5x0); ok {
return sc
@@ -918,7 +921,7 @@
func ToFunc5x1(c Func) Func5x1 {
if c.Type().NumIn() != 5 || c.Type().NumOut() != 1 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 5 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func5x1); ok {
return sc
@@ -959,7 +962,7 @@
func ToFunc5x2(c Func) Func5x2 {
if c.Type().NumIn() != 5 || c.Type().NumOut() != 2 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 5 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func5x2); ok {
return sc
@@ -1000,7 +1003,7 @@
func ToFunc5x3(c Func) Func5x3 {
if c.Type().NumIn() != 5 || c.Type().NumOut() != 3 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 5 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func5x3); ok {
return sc
@@ -1041,7 +1044,7 @@
func ToFunc6x0(c Func) Func6x0 {
if c.Type().NumIn() != 6 || c.Type().NumOut() != 0 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 6 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func6x0); ok {
return sc
@@ -1082,7 +1085,7 @@
func ToFunc6x1(c Func) Func6x1 {
if c.Type().NumIn() != 6 || c.Type().NumOut() != 1 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 6 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func6x1); ok {
return sc
@@ -1123,7 +1126,7 @@
func ToFunc6x2(c Func) Func6x2 {
if c.Type().NumIn() != 6 || c.Type().NumOut() != 2 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 6 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func6x2); ok {
return sc
@@ -1164,7 +1167,7 @@
func ToFunc6x3(c Func) Func6x3 {
if c.Type().NumIn() != 6 || c.Type().NumOut() != 3 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 6 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func6x3); ok {
return sc
@@ -1205,7 +1208,7 @@
func ToFunc7x0(c Func) Func7x0 {
if c.Type().NumIn() != 7 || c.Type().NumOut() != 0 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 7 inputs and 0 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func7x0); ok {
return sc
@@ -1246,7 +1249,7 @@
func ToFunc7x1(c Func) Func7x1 {
if c.Type().NumIn() != 7 || c.Type().NumOut() != 1 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 7 inputs and 1 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func7x1); ok {
return sc
@@ -1287,7 +1290,7 @@
func ToFunc7x2(c Func) Func7x2 {
if c.Type().NumIn() != 7 || c.Type().NumOut() != 2 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 7 inputs and 2 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func7x2); ok {
return sc
@@ -1328,7 +1331,7 @@
func ToFunc7x3(c Func) Func7x3 {
if c.Type().NumIn() != 7 || c.Type().NumOut() != 3 {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want 7 inputs and 3 outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func7x3); ok {
return sc
diff --git a/sdks/go/pkg/beam/core/util/reflectx/calls.tmpl b/sdks/go/pkg/beam/core/util/reflectx/calls.tmpl
index 60b6107..75b80e2 100644
--- a/sdks/go/pkg/beam/core/util/reflectx/calls.tmpl
+++ b/sdks/go/pkg/beam/core/util/reflectx/calls.tmpl
@@ -15,7 +15,10 @@
package reflectx
-import "reflect"
+import (
+ "reflect"
+ "fmt"
+)
// Generated arity-specialized Func implementations to avoid runtime temporary
// slices. Code that knows the arity can potentially avoid that overhead in
@@ -57,7 +60,7 @@
func ToFunc{{$in}}x{{$out}}(c Func) Func{{$in}}x{{$out}} {
if c.Type().NumIn() != {{$in}} || c.Type().NumOut() != {{$out}} {
- panic("incompatible func type")
+ panic(fmt.Sprintf("Incompatible func type: got func %v with %v inputs and %v outputs, want {{$in}} inputs and {{$out}} outputs", c.Type(), c.Type().NumIn(), c.Type().NumOut()))
}
if sc, ok := c.(Func{{$in}}x{{$out}}); ok {
return sc
diff --git a/sdks/go/pkg/beam/testing/passert/equals_test.go b/sdks/go/pkg/beam/testing/passert/equals_test.go
index 0af7d43..3138705 100644
--- a/sdks/go/pkg/beam/testing/passert/equals_test.go
+++ b/sdks/go/pkg/beam/testing/passert/equals_test.go
@@ -185,6 +185,7 @@
fmt.Println(err)
// Output:
+ // DoFn[UID:1, PID:passert.failIfBadEntries, Name: github.com/apache/beam/sdks/go/pkg/beam/testing/passert.failIfBadEntries] failed:
// actual PCollection does not match expected values
// =========
// 2 correct entries (present in both)
diff --git a/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
index 71f2172..b28371c 100644
--- a/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/spotbugs-filter.xml
@@ -64,6 +64,9 @@
<Class name="~.*AutoValue_.*"/>
</Match>
<Match>
+ <Package name="~.*jmh_generated.*"/>
+ </Match>
+ <Match>
<Package name="org.apache.beam.sdk.extensions.sql.impl.parser.impl"/>
</Match>
<Match>
diff --git a/sdks/java/harness/build.gradle b/sdks/java/harness/build.gradle
index 50ea8c1..3c859ae 100644
--- a/sdks/java/harness/build.gradle
+++ b/sdks/java/harness/build.gradle
@@ -33,6 +33,7 @@
],
automaticModuleName: 'org.apache.beam.fn.harness',
validateShadowJar: false,
+ enableJmh: true,
testShadowJar: true,
shadowClosure:
// Create an uber jar without repackaging for the SDK harness
@@ -72,4 +73,6 @@
testCompile project(path: ":sdks:java:core", configuration: "shadowTest")
testCompile project(":runners:core-construction-java")
shadowTestRuntimeClasspath library.java.slf4j_jdk14
+ jmhCompile project(path: ":sdks:java:harness", configuration: "shadowTest")
+ jmhRuntime library.java.slf4j_jdk14
}
diff --git a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java
new file mode 100644
index 0000000..9e009c3
--- /dev/null
+++ b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClientBenchmark.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness.logging;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.SimpleExecutionState;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.fn.test.InProcessManagedChannelFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.inprocess.InProcessServerBuilder;
+import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.StreamObserver;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Benchmarks for {@link BeamFnLoggingClient}. */
+public class BeamFnLoggingClientBenchmark {
+ private static final Logger LOG = LoggerFactory.getLogger(BeamFnLoggingClientBenchmark.class);
+
+ /** A logging service which counts the number of calls it received. */
+ public static class CallCountLoggingService extends BeamFnLoggingGrpc.BeamFnLoggingImplBase {
+ private AtomicInteger callCount = new AtomicInteger();
+
+ @Override
+ public StreamObserver<BeamFnApi.LogEntry.List> logging(
+ StreamObserver<BeamFnApi.LogControl> outboundObserver) {
+ return new StreamObserver<BeamFnApi.LogEntry.List>() {
+
+ @Override
+ public void onNext(BeamFnApi.LogEntry.List list) {
+ callCount.incrementAndGet();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ outboundObserver.onError(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ outboundObserver.onCompleted();
+ }
+ };
+ }
+ }
+
+ /** Setup a simple logging service and configure the {@link BeamFnLoggingClient}. */
+ @State(Scope.Benchmark)
+ public static class ManageLoggingClientAndService {
+ public final BeamFnLoggingClient loggingClient;
+ public final CallCountLoggingService loggingService;
+ public final Server server;
+
+ public ManageLoggingClientAndService() {
+ try {
+ ApiServiceDescriptor apiServiceDescriptor =
+ ApiServiceDescriptor.newBuilder()
+ .setUrl(BeamFnLoggingClientBenchmark.class.getName() + "#" + UUID.randomUUID())
+ .build();
+ ManagedChannelFactory managedChannelFactory = InProcessManagedChannelFactory.create();
+ loggingService = new CallCountLoggingService();
+ server =
+ InProcessServerBuilder.forName(apiServiceDescriptor.getUrl())
+ .addService(loggingService)
+ .build();
+ server.start();
+ loggingClient =
+ new BeamFnLoggingClient(
+ PipelineOptionsFactory.create(),
+ apiServiceDescriptor,
+ managedChannelFactory::forDescriptor);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() throws Exception {
+ loggingClient.close();
+ server.shutdown();
+ if (server.awaitTermination(30, TimeUnit.SECONDS)) {
+ server.shutdownNow();
+ }
+ }
+ }
+
+ /**
+ * A {@link ManageLoggingClientAndService} which validates that more than zero calls made it to
+ * the service.
+ */
+ @State(Scope.Benchmark)
+ public static class ManyExpectedCallsLoggingClientAndService
+ extends ManageLoggingClientAndService {
+ @Override
+ @TearDown
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (loggingService.callCount.get() <= 0) {
+ throw new IllegalStateException(
+ "Server expected greater then zero calls. Benchmark misconfigured?");
+ }
+ }
+ }
+
+ /**
+ * A {@link ManageLoggingClientAndService} which validates that exactly zero calls made it to the
+ * service.
+ */
+ @State(Scope.Benchmark)
+ public static class ZeroExpectedCallsLoggingClientAndService
+ extends ManageLoggingClientAndService {
+ @Override
+ @TearDown
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if (loggingService.callCount.get() != 0) {
+ throw new IllegalStateException("Server expected zero calls. Benchmark misconfigured?");
+ }
+ }
+ }
+
+ /** Sets up the {@link ExecutionStateTracker} and an execution state. */
+ @State(Scope.Benchmark)
+ public static class ManageExecutionState {
+ private final ExecutionStateTracker executionStateTracker;
+ private final SimpleExecutionState simpleExecutionState;
+
+ public ManageExecutionState() {
+ executionStateTracker = ExecutionStateTracker.newForTest();
+ HashMap<String, String> labelsMetadata = new HashMap<>();
+ labelsMetadata.put(MonitoringInfoConstants.Labels.PTRANSFORM, "ptransformId");
+ simpleExecutionState =
+ new SimpleExecutionState(
+ ExecutionStateTracker.PROCESS_STATE_NAME,
+ MonitoringInfoConstants.Urns.PROCESS_BUNDLE_MSECS,
+ labelsMetadata);
+ }
+
+ @TearDown
+ public void tearDown() throws Exception {
+ executionStateTracker.reset();
+ }
+ }
+
+ @Benchmark
+ @Threads(16) // Use several threads since we expect contention during logging
+ public void testLogging(ManyExpectedCallsLoggingClientAndService client) {
+ LOG.warn("log me");
+ }
+
+ @Benchmark
+ @Threads(16) // Use several threads since we expect contention during logging
+ public void testLoggingWithAllOptionalParameters(
+ ManyExpectedCallsLoggingClientAndService client, ManageExecutionState executionState)
+ throws Exception {
+ BeamFnLoggingMDC.setInstructionId("instruction id");
+ try (Closeable state =
+ executionState.executionStateTracker.enterState(executionState.simpleExecutionState)) {
+ LOG.warn("log me");
+ }
+ BeamFnLoggingMDC.setInstructionId(null);
+ }
+
+ @Benchmark
+ @Threads(16) // Use several threads since we expect contention during logging
+ public void testSkippedLogging(ZeroExpectedCallsLoggingClientAndService client) {
+ LOG.trace("no log");
+ }
+}
diff --git a/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java
new file mode 100644
index 0000000..304238e
--- /dev/null
+++ b/sdks/java/harness/src/jmh/java/org/apache/beam/fn/harness/logging/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Benchmarks for logging. */
+package org.apache.beam.fn.harness.logging;
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index 6cd9116..fe42f1a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -57,7 +57,6 @@
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p36p0.io.grpc.stub.ClientResponseObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -230,8 +229,8 @@
String transformId =
((SimpleExecutionState) state)
.getLabels()
- .getOrDefault(MonitoringInfoConstants.Labels.PTRANSFORM, "");
- if (!Strings.isNullOrEmpty(transformId)) {
+ .get(MonitoringInfoConstants.Labels.PTRANSFORM);
+ if (transformId != null) {
builder.setTransformId(transformId);
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
index 15ea5c0..16b96bf 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java
@@ -30,10 +30,8 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
@@ -147,8 +145,8 @@
private ValueProvider<String> loadJobProjectId;
private final Coder<ElementT> elementCoder;
private final RowWriterFactory<ElementT, DestinationT> rowWriterFactory;
- private String kmsKey;
- private boolean clusteringEnabled;
+ private final String kmsKey;
+ private final boolean clusteringEnabled;
// The maximum number of times to retry failed load or copy jobs.
private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS;
@@ -274,6 +272,8 @@
private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> input) {
Pipeline p = input.getPipeline();
final PCollectionView<String> loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD);
+ final PCollectionView<String> tempLoadJobIdPrefixView =
+ createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD);
final PCollectionView<String> copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY);
final PCollectionView<String> tempFilePrefixView =
createTempFilePrefixView(p, loadJobIdPrefixView);
@@ -321,9 +321,9 @@
.plusDelayOf(triggeringFrequency)))
.discardingFiredPanes());
- TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
+ TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> multiPartitionsTag =
new TupleTag<>("multiPartitionsTag");
- TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
+ TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag =
new TupleTag<>("singlePartitionTag");
// If we have non-default triggered output, we can't use the side-input technique used in
@@ -331,10 +331,10 @@
// determinism.
PCollectionTuple partitions =
results
- .apply("AttachSingletonKey", WithKeys.of((Void) null))
+ .apply("AttachDestinationKey", WithKeys.of(result -> result.destination))
.setCoder(
- KvCoder.of(VoidCoder.of(), WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
- .apply("GroupOntoSingleton", GroupByKey.create())
+ KvCoder.of(destinationCoder, WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
+ .apply("GroupFilesByDestination", GroupByKey.create())
.apply("ExtractResultValues", Values.create())
.apply(
"WritePartitionTriggered",
@@ -350,14 +350,14 @@
rowWriterFactory))
.withSideInputs(tempFilePrefixView)
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
- PCollection<KV<TableDestination, String>> tempTables =
- writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView);
+ PCollection<KV<TableDestination, WriteTables.Result>> tempTables =
+ writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView);
tempTables
// Now that the load job has happened, we want the rename to happen immediately.
.apply(
"Window Into Global Windows",
- Window.<KV<TableDestination, String>>into(new GlobalWindows())
+ Window.<KV<TableDestination, WriteTables.Result>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
.apply("Add Void Key", WithKeys.of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
@@ -382,6 +382,9 @@
public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> input) {
Pipeline p = input.getPipeline();
final PCollectionView<String> loadJobIdPrefixView = createJobIdPrefixView(p, JobType.LOAD);
+ final PCollectionView<String> tempLoadJobIdPrefixView =
+ createJobIdPrefixView(p, JobType.TEMP_TABLE_LOAD);
+ final PCollectionView<String> copyJobIdPrefixView = createJobIdPrefixView(p, JobType.COPY);
final PCollectionView<String> tempFilePrefixView =
createTempFilePrefixView(p, loadJobIdPrefixView);
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
@@ -395,10 +398,10 @@
? writeDynamicallyShardedFilesUntriggered(inputInGlobalWindow, tempFilePrefixView)
: writeStaticallyShardedFiles(inputInGlobalWindow, tempFilePrefixView);
- TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
- new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
- TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
- new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {};
+ TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> multiPartitionsTag =
+ new TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>>("multiPartitionsTag") {};
+ TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag =
+ new TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>>("singlePartitionTag") {};
// This transform will look at the set of files written for each table, and if any table has
// too many files or bytes, will partition that table's files into multiple partitions for
@@ -421,8 +424,8 @@
rowWriterFactory))
.withSideInputs(tempFilePrefixView)
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
- PCollection<KV<TableDestination, String>> tempTables =
- writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView);
+ PCollection<KV<TableDestination, WriteTables.Result>> tempTables =
+ writeTempTables(partitions.get(multiPartitionsTag), tempLoadJobIdPrefixView);
tempTables
.apply("ReifyRenameInput", new ReifyAsIterable<>())
@@ -431,7 +434,7 @@
ParDo.of(
new WriteRename(
bigQueryServices,
- loadJobIdPrefixView,
+ copyJobIdPrefixView,
writeDisposition,
createDisposition,
maxRetryJobs,
@@ -637,23 +640,22 @@
.apply(
"WriteGroupedRecords",
ParDo.of(
- new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
- tempFilePrefix, maxFileSize, rowWriterFactory))
+ new WriteGroupedRecordsToFiles<>(tempFilePrefix, maxFileSize, rowWriterFactory))
.withSideInputs(tempFilePrefix))
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}
// Take in a list of files and write them to temporary tables.
- private PCollection<KV<TableDestination, String>> writeTempTables(
- PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
+ private PCollection<KV<TableDestination, WriteTables.Result>> writeTempTables(
+ PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input,
PCollectionView<String> jobIdTokenView) {
List<PCollectionView<?>> sideInputs = Lists.newArrayList(jobIdTokenView);
sideInputs.addAll(dynamicDestinations.getSideInputs());
- Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
+ Coder<KV<ShardedKey<DestinationT>, WritePartition.Result>> partitionsCoder =
KvCoder.of(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
- ListCoder.of(StringUtf8Coder.of()));
+ WritePartition.ResultCoder.INSTANCE);
// If the final destination table exists already (and we're appending to it), then the temp
// tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object
@@ -695,20 +697,24 @@
rowWriterFactory.getSourceFormat(),
useAvroLogicalTypes,
schemaUpdateOptions))
- .setCoder(KvCoder.of(tableDestinationCoder, StringUtf8Coder.of()));
+ .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE));
}
// In the case where the files fit into a single load job, there's no need to write temporary
// tables and rename. We can load these files directly into the target BigQuery table.
void writeSinglePartition(
- PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
+ PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input,
PCollectionView<String> loadJobIdPrefixView) {
List<PCollectionView<?>> sideInputs = Lists.newArrayList(loadJobIdPrefixView);
sideInputs.addAll(dynamicDestinations.getSideInputs());
- Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
+
+ Coder<TableDestination> tableDestinationCoder =
+ clusteringEnabled ? TableDestinationCoderV3.of() : TableDestinationCoderV2.of();
+
+ Coder<KV<ShardedKey<DestinationT>, WritePartition.Result>> partitionsCoder =
KvCoder.of(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
- ListCoder.of(StringUtf8Coder.of()));
+ WritePartition.ResultCoder.INSTANCE);
// Write single partition to final table
input
.setCoder(partitionsCoder)
@@ -731,7 +737,8 @@
kmsKey,
rowWriterFactory.getSourceFormat(),
useAvroLogicalTypes,
- schemaUpdateOptions));
+ schemaUpdateOptions))
+ .setCoder(KvCoder.of(tableDestinationCoder, WriteTables.ResultCoder.INSTANCE));
}
private WriteResult writeResult(Pipeline p) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java
index 7e800fd0..7eae6fe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryResourceNaming.java
@@ -69,6 +69,7 @@
public enum JobType {
LOAD,
+ TEMP_TABLE_LOAD,
COPY,
EXPORT,
QUERY,
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
index cd4f163..e1e0566 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WritePartition.java
@@ -17,8 +17,17 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.List;
import java.util.Map;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
@@ -39,7 +48,32 @@
class WritePartition<DestinationT>
extends DoFn<
Iterable<WriteBundlesToFiles.Result<DestinationT>>,
- KV<ShardedKey<DestinationT>, List<String>>> {
+ KV<ShardedKey<DestinationT>, WritePartition.Result>> {
+ @AutoValue
+ abstract static class Result {
+ public abstract List<String> getFilenames();
+
+ abstract Boolean isFirstPane();
+ }
+
+ static class ResultCoder extends AtomicCoder<Result> {
+ private static final Coder<List<String>> FILENAMES_CODER = ListCoder.of(StringUtf8Coder.of());
+ private static final Coder<Boolean> FIRST_PANE_CODER = BooleanCoder.of();
+ static final ResultCoder INSTANCE = new ResultCoder();
+
+ @Override
+ public void encode(Result value, OutputStream outStream) throws IOException {
+ FILENAMES_CODER.encode(value.getFilenames(), outStream);
+ FIRST_PANE_CODER.encode(value.isFirstPane(), outStream);
+ }
+
+ @Override
+ public Result decode(InputStream inStream) throws IOException {
+ return new AutoValue_WritePartition_Result(
+ FILENAMES_CODER.decode(inStream), FIRST_PANE_CODER.decode(inStream));
+ }
+ }
+
private final boolean singletonTable;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
private final PCollectionView<String> tempFilePrefix;
@@ -47,8 +81,9 @@
private final long maxSizeBytes;
private final RowWriterFactory<?, DestinationT> rowWriterFactory;
- private @Nullable TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag;
- private TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag;
+ private @Nullable TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>>
+ multiPartitionsTag;
+ private TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag;
private static class PartitionData {
private int numFiles = 0;
@@ -131,8 +166,8 @@
PCollectionView<String> tempFilePrefix,
int maxNumFiles,
long maxSizeBytes,
- TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag,
- TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag,
+ TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> multiPartitionsTag,
+ TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> singlePartitionTag,
RowWriterFactory<?, DestinationT> rowWriterFactory) {
this.singletonTable = singletonTable;
this.dynamicDestinations = dynamicDestinations;
@@ -147,7 +182,6 @@
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
List<WriteBundlesToFiles.Result<DestinationT>> results = Lists.newArrayList(c.element());
-
// If there are no elements to write _and_ the user specified a constant output table, then
// generate an empty table of that name.
if (results.isEmpty() && singletonTable) {
@@ -161,7 +195,8 @@
BigQueryRowWriter.Result writerResult = writer.getResult();
results.add(
- new Result<>(writerResult.resourceId.toString(), writerResult.byteSize, destination));
+ new WriteBundlesToFiles.Result<>(
+ writerResult.resourceId.toString(), writerResult.byteSize, destination));
}
Map<DestinationT, DestinationData> currentResults = Maps.newHashMap();
@@ -190,11 +225,16 @@
// In the fast-path case where we only output one table, the transform loads it directly
// to the final table. In this case, we output on a special TupleTag so the enclosing
// transform knows to skip the rename step.
- TupleTag<KV<ShardedKey<DestinationT>, List<String>>> outputTag =
+ TupleTag<KV<ShardedKey<DestinationT>, WritePartition.Result>> outputTag =
(destinationData.getPartitions().size() == 1) ? singlePartitionTag : multiPartitionsTag;
for (int i = 0; i < destinationData.getPartitions().size(); ++i) {
PartitionData partitionData = destinationData.getPartitions().get(i);
- c.output(outputTag, KV.of(ShardedKey.of(destination, i + 1), partitionData.getFilenames()));
+ c.output(
+ outputTag,
+ KV.of(
+ ShardedKey.of(destination, i + 1),
+ new AutoValue_WritePartition_Result(
+ partitionData.getFilenames(), c.pane().isFirst())));
}
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
index a45f6f8..80201ff 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteRename.java
@@ -39,6 +39,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
import org.slf4j.Logger;
@@ -51,7 +52,7 @@
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
-class WriteRename extends DoFn<Iterable<KV<TableDestination, String>>, Void> {
+class WriteRename extends DoFn<Iterable<KV<TableDestination, WriteTables.Result>>, Void> {
private static final Logger LOG = LoggerFactory.getLogger(WriteRename.class);
private final BigQueryServices bqServices;
@@ -116,12 +117,15 @@
}
@ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();
- for (KV<TableDestination, String> entry : c.element()) {
+ public void processElement(
+ @Element Iterable<KV<TableDestination, WriteTables.Result>> element, ProcessContext c)
+ throws Exception {
+ Multimap<TableDestination, WriteTables.Result> tempTables = ArrayListMultimap.create();
+ for (KV<TableDestination, WriteTables.Result> entry : element) {
tempTables.put(entry.getKey(), entry.getValue());
}
- for (Map.Entry<TableDestination, Collection<String>> entry : tempTables.asMap().entrySet()) {
+ for (Map.Entry<TableDestination, Collection<WriteTables.Result>> entry :
+ tempTables.asMap().entrySet()) {
// Process each destination table.
// Do not copy if no temp tables are provided.
if (!entry.getValue().isEmpty()) {
@@ -165,17 +169,27 @@
}
private PendingJobData startWriteRename(
- TableDestination finalTableDestination, Iterable<String> tempTableNames, ProcessContext c)
+ TableDestination finalTableDestination,
+ Iterable<WriteTables.Result> tempTableNames,
+ ProcessContext c)
throws Exception {
+ // The pane may have advanced either here due to triggering or due to an upstream trigger. We
+ // check the upstream
+ // trigger to handle the case where an earlier pane triggered the single-partition path. If this
+ // happened, then the
+ // table will already exist so we want to append to the table.
+ boolean isFirstPane =
+ Iterables.getFirst(tempTableNames, null).isFirstPane() && c.pane().isFirst();
WriteDisposition writeDisposition =
- (c.pane().getIndex() == 0) ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
+ isFirstPane ? firstPaneWriteDisposition : WriteDisposition.WRITE_APPEND;
CreateDisposition createDisposition =
- (c.pane().getIndex() == 0) ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
+ isFirstPane ? firstPaneCreateDisposition : CreateDisposition.CREATE_NEVER;
List<TableReference> tempTables =
StreamSupport.stream(tempTableNames.spliterator(), false)
- .map(table -> BigQueryHelpers.fromJsonString(table, TableReference.class))
+ .map(
+ result ->
+ BigQueryHelpers.fromJsonString(result.getTableName(), TableReference.class))
.collect(Collectors.toList());
- ;
// Make sure each destination table gets a unique job id.
String jobIdPrefix =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 465c924..32ed1fe 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -26,11 +26,17 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
+import com.google.auto.value.AutoValue;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -68,7 +74,10 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.checkerframework.checker.initialization.qual.Initialized;
+import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,8 +98,35 @@
})
class WriteTables<DestinationT>
extends PTransform<
- PCollection<KV<ShardedKey<DestinationT>, List<String>>>,
- PCollection<KV<TableDestination, String>>> {
+ PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>>,
+ PCollection<KV<TableDestination, WriteTables.Result>>> {
+ @AutoValue
+ abstract static class Result {
+ abstract String getTableName();
+
+ abstract Boolean isFirstPane();
+ }
+
+ static class ResultCoder extends AtomicCoder<WriteTables.Result> {
+ static final ResultCoder INSTANCE = new ResultCoder();
+
+ @Override
+ public void encode(Result value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream)
+ throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
+ @Initialized IOException {
+ StringUtf8Coder.of().encode(value.getTableName(), outStream);
+ BooleanCoder.of().encode(value.isFirstPane(), outStream);
+ }
+
+ @Override
+ public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream)
+ throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
+ @Initialized IOException {
+ return new AutoValue_WriteTables_Result(
+ StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream));
+ }
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(WriteTables.class);
private final boolean tempTable;
@@ -101,7 +137,7 @@
private final Set<SchemaUpdateOption> schemaUpdateOptions;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
private final List<PCollectionView<?>> sideInputs;
- private final TupleTag<KV<TableDestination, String>> mainOutputTag;
+ private final TupleTag<KV<TableDestination, WriteTables.Result>> mainOutputTag;
private final TupleTag<String> temporaryFilesTag;
private final ValueProvider<String> loadJobProjectId;
private final int maxRetryJobs;
@@ -113,7 +149,9 @@
private @Nullable JobService jobService;
private class WriteTablesDoFn
- extends DoFn<KV<ShardedKey<DestinationT>, List<String>>, KV<TableDestination, String>> {
+ extends DoFn<
+ KV<ShardedKey<DestinationT>, WritePartition.Result>, KV<TableDestination, Result>> {
+
private Map<DestinationT, String> jsonSchemas = Maps.newHashMap();
// Represents a pending BigQuery load job.
@@ -123,18 +161,21 @@
final List<String> partitionFiles;
final TableDestination tableDestination;
final TableReference tableReference;
+ final boolean isFirstPane;
public PendingJobData(
BoundedWindow window,
BigQueryHelpers.PendingJob retryJob,
List<String> partitionFiles,
TableDestination tableDestination,
- TableReference tableReference) {
+ TableReference tableReference,
+ boolean isFirstPane) {
this.window = window;
this.retryJob = retryJob;
this.partitionFiles = partitionFiles;
this.tableDestination = tableDestination;
this.tableReference = tableReference;
+ this.isFirstPane = isFirstPane;
}
}
// All pending load jobs.
@@ -149,7 +190,11 @@
}
@ProcessElement
- public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+ public void processElement(
+ @Element KV<ShardedKey<DestinationT>, WritePartition.Result> element,
+ ProcessContext c,
+ BoundedWindow window)
+ throws Exception {
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
DestinationT destination = c.element().getKey().getKey();
TableSchema tableSchema;
@@ -199,8 +244,8 @@
tableDestination = tableDestination.withTableReference(tableReference);
}
- Integer partition = c.element().getKey().getShardNumber();
- List<String> partitionFiles = Lists.newArrayList(c.element().getValue());
+ Integer partition = element.getKey().getShardNumber();
+ List<String> partitionFiles = Lists.newArrayList(element.getValue().getFilenames());
String jobIdPrefix =
BigQueryResourceNaming.createJobIdWithDestination(
c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex());
@@ -212,7 +257,7 @@
WriteDisposition writeDisposition = firstPaneWriteDisposition;
CreateDisposition createDisposition = firstPaneCreateDisposition;
- if (c.pane().getIndex() > 0 && !tempTable) {
+ if (!element.getValue().isFirstPane() && !tempTable) {
// If writing directly to the destination, then the table is created on the first write
// and we should change the disposition for subsequent writes.
writeDisposition = WriteDisposition.WRITE_APPEND;
@@ -238,7 +283,13 @@
createDisposition,
schemaUpdateOptions);
pendingJobs.add(
- new PendingJobData(window, retryJob, partitionFiles, tableDestination, tableReference));
+ new PendingJobData(
+ window,
+ retryJob,
+ partitionFiles,
+ tableDestination,
+ tableReference,
+ element.getValue().isFirstPane()));
}
@Teardown
@@ -284,7 +335,7 @@
bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
PendingJobManager jobManager = new PendingJobManager();
- for (PendingJobData pendingJob : pendingJobs) {
+ for (final PendingJobData pendingJob : pendingJobs) {
jobManager =
jobManager.addPendingJob(
pendingJob.retryJob,
@@ -299,11 +350,14 @@
BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
pendingJob.tableDestination.getTableDescription());
}
+
+ Result result =
+ new AutoValue_WriteTables_Result(
+ BigQueryHelpers.toJsonString(pendingJob.tableReference),
+ pendingJob.isFirstPane);
c.output(
mainOutputTag,
- KV.of(
- pendingJob.tableDestination,
- BigQueryHelpers.toJsonString(pendingJob.tableReference)),
+ KV.of(pendingJob.tableDestination, result),
pendingJob.window.maxTimestamp(),
pendingJob.window);
for (String file : pendingJob.partitionFiles) {
@@ -365,8 +419,8 @@
}
@Override
- public PCollection<KV<TableDestination, String>> expand(
- PCollection<KV<ShardedKey<DestinationT>, List<String>>> input) {
+ public PCollection<KV<TableDestination, Result>> expand(
+ PCollection<KV<ShardedKey<DestinationT>, WritePartition.Result>> input) {
PCollectionTuple writeTablesOutputs =
input.apply(
ParDo.of(new WriteTablesDoFn())
@@ -391,7 +445,6 @@
.apply(GroupByKey.create())
.apply(Values.create())
.apply(ParDo.of(new GarbageCollectTemporaryFiles()));
-
return writeTablesOutputs.get(mainOutputTag);
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
index bde803d..6799c67 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
@@ -65,6 +65,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
@@ -72,13 +73,17 @@
import org.apache.avro.io.Encoder;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.SchemaUpdateOption;
+import org.apache.beam.sdk.io.gcp.bigquery.WritePartition.ResultCoder;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
@@ -1758,10 +1763,12 @@
}
}
- TupleTag<KV<ShardedKey<TableDestination>, List<String>>> multiPartitionsTag =
- new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("multiPartitionsTag") {};
- TupleTag<KV<ShardedKey<TableDestination>, List<String>>> singlePartitionTag =
- new TupleTag<KV<ShardedKey<TableDestination>, List<String>>>("singlePartitionTag") {};
+ TupleTag<KV<ShardedKey<TableDestination>, WritePartition.Result>> multiPartitionsTag =
+ new TupleTag<KV<ShardedKey<TableDestination>, WritePartition.Result>>(
+ "multiPartitionsTag") {};
+ TupleTag<KV<ShardedKey<TableDestination>, WritePartition.Result>> singlePartitionTag =
+ new TupleTag<KV<ShardedKey<TableDestination>, WritePartition.Result>>(
+ "singlePartitionTag") {};
String tempFilePrefix = testFolder.newFolder("BigQueryIOTest").getAbsolutePath();
PCollectionView<String> tempFilePrefixView =
@@ -1781,12 +1788,12 @@
DoFnTester<
Iterable<WriteBundlesToFiles.Result<TableDestination>>,
- KV<ShardedKey<TableDestination>, List<String>>>
+ KV<ShardedKey<TableDestination>, WritePartition.Result>>
tester = DoFnTester.of(writePartition);
tester.setSideInput(tempFilePrefixView, GlobalWindow.INSTANCE, tempFilePrefix);
tester.processElement(files);
- List<KV<ShardedKey<TableDestination>, List<String>>> partitions;
+ List<KV<ShardedKey<TableDestination>, WritePartition.Result>> partitions;
if (expectedNumPartitionsPerTable > 1) {
partitions = tester.takeOutputElements(multiPartitionsTag);
} else {
@@ -1795,12 +1802,12 @@
List<ShardedKey<TableDestination>> partitionsResult = Lists.newArrayList();
Map<String, List<String>> filesPerTableResult = Maps.newHashMap();
- for (KV<ShardedKey<TableDestination>, List<String>> partition : partitions) {
+ for (KV<ShardedKey<TableDestination>, WritePartition.Result> partition : partitions) {
String table = partition.getKey().getKey().getTableSpec();
partitionsResult.add(partition.getKey());
List<String> tableFilesResult =
filesPerTableResult.computeIfAbsent(table, k -> Lists.newArrayList());
- tableFilesResult.addAll(partition.getValue());
+ tableFilesResult.addAll(partition.getValue().getFilenames());
}
assertThat(
@@ -1847,7 +1854,7 @@
String jobIdToken = "jobId";
final Multimap<TableDestination, String> expectedTempTables = ArrayListMultimap.create();
- List<KV<ShardedKey<String>, List<String>>> partitions = Lists.newArrayList();
+ List<KV<ShardedKey<String>, WritePartition.Result>> partitions = Lists.newArrayList();
for (int i = 0; i < numTables; ++i) {
String tableName = String.format("project-id:dataset-id.table%05d", i);
TableDestination tableDestination = new TableDestination(tableName, tableName);
@@ -1869,7 +1876,10 @@
}
filesPerPartition.add(writer.getResult().resourceId.toString());
}
- partitions.add(KV.of(ShardedKey.of(tableDestination.getTableSpec(), j), filesPerPartition));
+ partitions.add(
+ KV.of(
+ ShardedKey.of(tableDestination.getTableSpec(), j),
+ new AutoValue_WritePartition_Result(filesPerPartition, true)));
String json =
String.format(
@@ -1879,8 +1889,11 @@
}
}
- PCollection<KV<ShardedKey<String>, List<String>>> writeTablesInput =
- p.apply(Create.of(partitions));
+ PCollection<KV<ShardedKey<String>, WritePartition.Result>> writeTablesInput =
+ p.apply(
+ Create.of(partitions)
+ .withCoder(
+ KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), ResultCoder.INSTANCE)));
PCollectionView<String> jobIdTokenView =
p.apply("CreateJobId", Create.of("jobId")).apply(View.asSingleton());
List<PCollectionView<?>> sideInputs = ImmutableList.of(jobIdTokenView);
@@ -1903,18 +1916,25 @@
false,
Collections.emptySet());
- PCollection<KV<TableDestination, String>> writeTablesOutput =
- writeTablesInput.apply(writeTables);
+ PCollection<KV<TableDestination, WriteTables.Result>> writeTablesOutput =
+ writeTablesInput
+ .apply(writeTables)
+ .setCoder(KvCoder.of(TableDestinationCoderV3.of(), WriteTables.ResultCoder.INSTANCE));
PAssert.thatMultimap(writeTablesOutput)
.satisfies(
input -> {
assertEquals(input.keySet(), expectedTempTables.keySet());
- for (Map.Entry<TableDestination, Iterable<String>> entry : input.entrySet()) {
+ for (Map.Entry<TableDestination, Iterable<WriteTables.Result>> entry :
+ input.entrySet()) {
+ Iterable<String> tableNames =
+ StreamSupport.stream(entry.getValue().spliterator(), false)
+ .map(Result::getTableName)
+ .collect(Collectors.toList());
@SuppressWarnings("unchecked")
String[] expectedValues =
Iterables.toArray(expectedTempTables.get(entry.getKey()), String.class);
- assertThat(entry.getValue(), containsInAnyOrder(expectedValues));
+ assertThat(tableNames, containsInAnyOrder(expectedValues));
}
return null;
});
@@ -1951,7 +1971,7 @@
Multimap<TableDestination, TableRow> expectedRowsPerTable = ArrayListMultimap.create();
String jobIdToken = "jobIdToken";
Multimap<TableDestination, String> tempTables = ArrayListMultimap.create();
- List<KV<TableDestination, String>> tempTablesElement = Lists.newArrayList();
+ List<KV<TableDestination, WriteTables.Result>> tempTablesElement = Lists.newArrayList();
for (int i = 0; i < numFinalTables; ++i) {
String tableName = "project-id:dataset-id.table_" + i;
TableDestination tableDestination = new TableDestination(tableName, "table_" + i + "_desc");
@@ -1971,7 +1991,8 @@
expectedRowsPerTable.putAll(tableDestination, rows);
String tableJson = toJsonString(tempTable);
tempTables.put(tableDestination, tableJson);
- tempTablesElement.add(KV.of(tableDestination, tableJson));
+ tempTablesElement.add(
+ KV.of(tableDestination, new AutoValue_WriteTables_Result(tableJson, true)));
}
}
@@ -1987,7 +2008,8 @@
3,
"kms_key");
- DoFnTester<Iterable<KV<TableDestination, String>>, Void> tester = DoFnTester.of(writeRename);
+ DoFnTester<Iterable<KV<TableDestination, WriteTables.Result>>, Void> tester =
+ DoFnTester.of(writeRename);
tester.setSideInput(jobIdTokenView, GlobalWindow.INSTANCE, jobIdToken);
tester.processElement(tempTablesElement);
tester.finishBundle();
diff --git a/sdks/python/MANIFEST.in b/sdks/python/MANIFEST.in
index c97e57a..60b0989 100644
--- a/sdks/python/MANIFEST.in
+++ b/sdks/python/MANIFEST.in
@@ -19,3 +19,4 @@
include README.md
include NOTICE
include LICENSE
+include LICENSE.python
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 4d1d892..2048742 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -21,7 +21,7 @@
import unittest
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam.examples.complete import autocomplete
@@ -55,7 +55,7 @@
('that', ((1, 'that'), )),
]))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_autocomplete_it(self):
with TestPipeline(is_integration_test=True) as p:
words = p | beam.io.ReadFromText(self.KINGLEAR_INPUT)
diff --git a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
index 91388a4..ea7ec28 100644
--- a/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/game_stats_it_test.py
@@ -20,7 +20,7 @@
Code: beam/sdks/python/apache_beam/examples/complete/game/game_stats.py
Usage:
- python setup.py nosetests --test-pipeline-options=" \
+ pytest --test-pipeline-options=" \
--runner=TestDataflowRunner \
--project=... \
--region=... \
@@ -38,8 +38,8 @@
import unittest
import uuid
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.examples.complete.game import game_stats
from apache_beam.io.gcp.tests import utils
@@ -103,7 +103,7 @@
test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
test_utils.cleanup_topics(self.pub_client, [self.input_topic])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_game_stats_it(self):
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
index f21abac..a9729bc 100644
--- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py
@@ -20,7 +20,7 @@
Code: beam/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py
Usage:
- python setup.py nosetests --test-pipeline-options=" \
+ pytest --test-pipeline-options=" \
--runner=TestDataflowRunner \
--project=... \
--region=... \
@@ -36,8 +36,8 @@
import logging
import unittest
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.examples.complete.game import hourly_team_score
from apache_beam.io.gcp.tests import utils
@@ -63,7 +63,7 @@
self.dataset_ref = utils.create_bq_dataset(
self.project, self.OUTPUT_DATASET)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_hourly_team_score_it(self):
state_verifier = PipelineStateMatcher(PipelineState.DONE)
query = (
diff --git a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
index 8f5f91c..8b82c64 100644
--- a/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/leader_board_it_test.py
@@ -20,7 +20,7 @@
Code: beam/sdks/python/apache_beam/examples/complete/game/leader_board.py
Usage:
- python setup.py nosetests --test-pipeline-options=" \
+ pytest --test-pipeline-options=" \
--runner=TestDataflowRunner \
--project=... \
--region=... \
@@ -38,8 +38,8 @@
import unittest
import uuid
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.examples.complete.game import leader_board
from apache_beam.io.gcp.tests import utils
@@ -104,7 +104,7 @@
test_utils.cleanup_subscriptions(self.sub_client, [self.input_sub])
test_utils.cleanup_topics(self.pub_client, [self.input_topic])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_leader_board_it(self):
state_verifier = PipelineStateMatcher(PipelineState.RUNNING)
diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py
index a2b3a17..d26565e 100644
--- a/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py
+++ b/sdks/python/apache_beam/examples/complete/game/user_score_it_test.py
@@ -20,7 +20,7 @@
Code: beam/sdks/python/apache_beam/examples/complete/game/user_score.py
Usage:
- python setup.py nosetests --test-pipeline-options=" \
+ pytest --test-pipeline-options=" \
--runner=TestDataflowRunner \
--project=... \
--region=... \
@@ -37,8 +37,8 @@
import unittest
import uuid
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.examples.complete.game import user_score
from apache_beam.runners.runner import PipelineState
@@ -60,7 +60,7 @@
self.output = '/'.join(
[self.test_pipeline.get_option('output'), self.uuid, 'results'])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_user_score_it(self):
state_verifier = PipelineStateMatcher(PipelineState.DONE)
diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py
index 9b6097b..a2a3262 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset_test_it.py
@@ -24,8 +24,8 @@
import unittest
import uuid
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.examples.complete.juliaset.juliaset import juliaset
from apache_beam.io.filesystems import FileSystems
@@ -34,7 +34,7 @@
from apache_beam.testing.test_pipeline import TestPipeline
-@attr('IT')
+@pytest.mark.it_postcommit
class JuliaSetTestIT(unittest.TestCase):
GRID_SIZE = 1000
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
index cfec86b..fa5f12c 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_it_test.py
@@ -23,8 +23,8 @@
import time
import unittest
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.examples.cookbook import bigquery_tornadoes
from apache_beam.io.gcp.tests import utils
@@ -42,7 +42,7 @@
# from expected Bigquery table.
DEFAULT_CHECKSUM = 'd860e636050c559a16a791aff40d6ad809d4daf0'
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_bigquery_tornadoes_it(self):
test_pipeline = TestPipeline(is_integration_test=True)
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py
index 36f81bd..388caf6 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount_it_test.py
@@ -23,8 +23,8 @@
import time
import unittest
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
@@ -42,7 +42,7 @@
DATASTORE_WORDCOUNT_KIND = "DatastoreWordCount"
EXPECTED_CHECKSUM = '826f69ed0275858c2e098f1e8407d4e3ba5a4b3f'
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_datastore_wordcount_it(self):
test_pipeline = TestPipeline(is_integration_test=True)
kind = self.DATASTORE_WORDCOUNT_KIND
diff --git a/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py b/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py
index e1a0ac4..6ed376e 100644
--- a/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py
+++ b/sdks/python/apache_beam/examples/dataframe/flight_delays_it_test.py
@@ -28,7 +28,7 @@
import uuid
import pandas as pd
-from nose.plugins.attrib import attr
+import pytest
from apache_beam.examples.dataframe import flight_delays
from apache_beam.io.filesystems import FileSystems
@@ -100,7 +100,7 @@
def tearDown(self):
FileSystems.delete([self.outdir + '/'])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_flight_delays(self):
flight_delays.run_flight_delay_pipeline(
self.test_pipeline,
diff --git a/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py b/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py
index 4fedfa8..f81b7d8 100644
--- a/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py
+++ b/sdks/python/apache_beam/examples/dataframe/taxiride_it_test.py
@@ -25,7 +25,7 @@
import uuid
import pandas as pd
-from nose.plugins.attrib import attr
+import pytest
from apache_beam.examples.dataframe import taxiride
from apache_beam.io.filesystems import FileSystems
@@ -44,7 +44,7 @@
def tearDown(self):
FileSystems.delete([self.outdir + '/'])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_aggregation(self):
taxiride.run_aggregation_pipeline(
self.test_pipeline,
@@ -71,7 +71,7 @@
pd.testing.assert_frame_equal(expected, result)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_enrich(self):
# Standard workers OOM with the enrich pipeline
self.test_pipeline.get_pipeline_options().view_as(
diff --git a/sdks/python/apache_beam/examples/fastavro_it_test.py b/sdks/python/apache_beam/examples/fastavro_it_test.py
index 9ac8051..c9bb988 100644
--- a/sdks/python/apache_beam/examples/fastavro_it_test.py
+++ b/sdks/python/apache_beam/examples/fastavro_it_test.py
@@ -24,7 +24,7 @@
Usage:
DataFlowRunner:
- python setup.py nosetests --tests apache_beam.examples.fastavro_it_test \
+ pytest apache_beam/examples/fastavro_it_test.py \
--test-pipeline-options="
--runner=TestDataflowRunner
--project=...
@@ -36,7 +36,7 @@
"
DirectRunner:
- python setup.py nosetests --tests apache_beam.examples.fastavro_it_test \
+ pytest apache_beam/examples/fastavro_it_test.py \
--test-pipeline-options="
--output=/tmp
--records=5000
@@ -50,9 +50,9 @@
import unittest
import uuid
+import pytest
from avro.schema import Parse
from fastavro import parse_schema
-from nose.plugins.attrib import attr
from apache_beam.io.avroio import ReadAllFromAvro
from apache_beam.io.avroio import WriteToAvro
@@ -98,7 +98,7 @@
self.uuid = str(uuid.uuid4())
self.output = '/'.join([self.test_pipeline.get_option('output'), self.uuid])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_avro_it(self):
num_records = self.test_pipeline.get_option('records')
num_records = int(num_records) if num_records else 1000000
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py
index 367fc82..9101ff8 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_debugging_it_test.py
@@ -23,8 +23,8 @@
import unittest
import uuid
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.examples import streaming_wordcount_debugging
from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
@@ -94,7 +94,7 @@
test_utils.cleanup_topics(
self.pub_client, [self.input_topic, self.output_topic])
- @attr('IT')
+ @pytest.mark.it_postcommit
@unittest.skip(
"Skipped due to [BEAM-3377]: assert_that not working for streaming")
def test_streaming_wordcount_debugging_it(self):
diff --git a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
index 7812283..8beae5e 100644
--- a/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/streaming_wordcount_it_test.py
@@ -23,8 +23,8 @@
import unittest
import uuid
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.examples import streaming_wordcount
from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
@@ -77,7 +77,7 @@
test_utils.cleanup_topics(
self.pub_client, [self.input_topic, self.output_topic])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_streaming_wordcount_it(self):
# Build expected dataset.
expected_msg = [('%d: 1' % num).encode('utf-8')
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 242dcff..8ee49c7 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -26,7 +26,6 @@
import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.examples import wordcount
from apache_beam.testing.load_tests.load_test_metrics_utils import InfluxDBMetricsPublisherOptions
@@ -39,19 +38,16 @@
class WordCountIT(unittest.TestCase):
- # Enable nose tests running in parallel
- _multiprocess_can_split_ = True
-
# The default checksum is a SHA-1 hash generated from a sorted list of
# lines read from expected output. This value corresponds to the default
# input of WordCount example.
DEFAULT_CHECKSUM = '33535a832b7db6d78389759577d4ff495980b9c0'
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_wordcount_it(self):
self._run_wordcount_it(wordcount.run)
- @attr('IT')
+ @pytest.mark.it_postcommit
@pytest.mark.it_validatescontainer
def test_wordcount_fnapi_it(self):
self._run_wordcount_it(wordcount.run, experiment='beam_fn_api')
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index a39584a..90086c9 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -28,8 +28,8 @@
import uuid
import warnings
+import pytest
from hamcrest.library.text import stringmatches
-from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.io import fileio
@@ -291,7 +291,7 @@
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_transform_on_gcs(self):
args = self.test_pipeline.get_full_options_as_args()
diff --git a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
index 0a30462..699dfa4 100644
--- a/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/big_query_query_to_table_it_test.py
@@ -28,8 +28,8 @@
import time
import unittest
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.io.gcp import big_query_query_to_table_pipeline
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
@@ -154,7 +154,7 @@
self.project, self.dataset_id, NEW_TYPES_INPUT_TABLE, table_data)
self.assertTrue(passed, 'Error in BQ setup: %s' % errors)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_big_query_legacy_sql(self):
verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table
expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED)
@@ -177,7 +177,7 @@
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_big_query_standard_sql(self):
verify_query = DIALECT_OUTPUT_VERIFY_QUERY % self.output_table
expected_checksum = test_utils.compute_hash(DIALECT_OUTPUT_EXPECTED)
@@ -200,7 +200,7 @@
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_big_query_standard_sql_kms_key_native(self):
if isinstance(self.test_pipeline.runner, TestDirectRunner):
self.skipTest("This test doesn't work on DirectRunner.")
@@ -236,7 +236,7 @@
'No encryption configuration found: %s' % table)
self.assertEqual(kms_key, table.encryptionConfiguration.kmsKeyName)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_big_query_new_types(self):
expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
@@ -260,7 +260,7 @@
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_big_query_new_types_avro(self):
expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
@@ -283,7 +283,7 @@
options = self.test_pipeline.get_full_options_as_args(**extra_opts)
big_query_query_to_table_pipeline.run_bq_pipeline(options)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_big_query_new_types_native(self):
expected_checksum = test_utils.compute_hash(NEW_TYPES_OUTPUT_EXPECTED)
verify_query = NEW_TYPES_OUTPUT_VERIFY_QUERY % self.output_table
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 9672431..1e227f8 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -26,10 +26,10 @@
import unittest
import mock
+import pytest
from hamcrest.core import assert_that as hamcrest_assert
from hamcrest.core.core.allof import all_of
from hamcrest.core.core.is_ import is_
-from nose.plugins.attrib import attr
from parameterized import param
from parameterized import parameterized
@@ -744,7 +744,7 @@
_LOGGER.info(
"Created dataset %s in project %s", self.dataset_id, self.project)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_multiple_destinations_transform(self):
output_table_1 = '%s%s' % (self.output_table, 1)
output_table_2 = '%s%s' % (self.output_table, 2)
@@ -824,7 +824,7 @@
max_file_size=20,
max_files_per_bundle=-1))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_bqfl_streaming(self):
if isinstance(self.test_pipeline.runner, TestDataflowRunner):
self.skipTest("TestStream is not supported on TestDataflowRunner")
@@ -862,7 +862,7 @@
.Method.FILE_LOADS,
triggering_frequency=100))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_one_job_fails_all_jobs_fail(self):
# If one of the import jobs fails, then other jobs must not be performed.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py
index 6652b4e..63cc445 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_io_read_it_test.py
@@ -25,8 +25,8 @@
import logging
import unittest
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.io.gcp import bigquery_io_read_pipeline
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
@@ -59,11 +59,11 @@
bigquery_io_read_pipeline.run(
test_pipeline.get_full_options_as_args(**extra_opts))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_bigquery_read_custom_1M_python(self):
self.run_bigquery_io_read_pipeline('1M', True)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_bigquery_read_1M_python(self):
self.run_bigquery_io_read_pipeline('1M')
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 4040a60..472b521 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -29,7 +29,7 @@
from decimal import Decimal
from functools import wraps
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
@@ -157,7 +157,7 @@
cls.project, cls.dataset_id, table_name, cls.TABLE_DATA)
@skip(['PortableRunner', 'FlinkRunner'])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_native_source(self):
with beam.Pipeline(argv=self.args) as p:
result = (
@@ -165,7 +165,7 @@
beam.io.BigQuerySource(query=self.query, use_standard_sql=True)))
assert_that(result, equal_to(self.TABLE_DATA))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_iobase_source(self):
query = StaticValueProvider(str, self.query)
with beam.Pipeline(argv=self.args) as p:
@@ -272,7 +272,7 @@
return expected_data
@skip(['PortableRunner', 'FlinkRunner'])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_native_source(self):
with beam.Pipeline(argv=self.args) as p:
result = (
@@ -281,7 +281,7 @@
beam.io.BigQuerySource(query=self.query, use_standard_sql=True)))
assert_that(result, equal_to(self.get_expected_data()))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_iobase_source(self):
with beam.Pipeline(argv=self.args) as p:
result = (
@@ -378,7 +378,7 @@
return table_schema
@skip(['PortableRunner', 'FlinkRunner'])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_read_queries(self):
# TODO(BEAM-11311): Remove experiment when tests run on r_v2.
args = self.args + ["--experiments=use_runner_v2"]
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 19550b5..ce5874c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -32,8 +32,8 @@
import hamcrest as hc
import mock
+import pytest
import pytz
-from nose.plugins.attrib import attr
from parameterized import param
from parameterized import parameterized
@@ -1047,13 +1047,6 @@
class BigQueryStreamingInsertTransformIntegrationTests(unittest.TestCase):
BIG_QUERY_DATASET_ID = 'python_bq_streaming_inserts_'
- # Prevent nose from finding and running tests that were not
- # specified in the Gradle file.
- # See "More tests may be found" in:
- # https://nose.readthedocs.io/en/latest/doc_tests/test_multiprocess
- # /multiprocess.html#other-differences-in-test-running
- _multiprocess_can_split_ = True
-
def setUp(self):
self.test_pipeline = TestPipeline(is_integration_test=True)
self.runner_name = type(self.test_pipeline.runner).__name__
@@ -1069,7 +1062,7 @@
_LOGGER.info(
"Created dataset %s in project %s", self.dataset_id, self.project)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_value_provider_transform(self):
output_table_1 = '%s%s' % (self.output_table, 1)
output_table_2 = '%s%s' % (self.output_table, 2)
@@ -1139,7 +1132,7 @@
additional_bq_parameters=lambda _: additional_bq_parameters,
method='FILE_LOADS'))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_multiple_destinations_transform(self):
streaming = self.test_pipeline.options.view_as(StandardOptions).streaming
if streaming and isinstance(self.test_pipeline.runner, TestDataflowRunner):
@@ -1333,11 +1326,11 @@
method=method,
triggering_frequency=triggering_frequency)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_streaming_inserts(self):
self._run_pubsub_bq_pipeline(WriteToBigQuery.Method.STREAMING_INSERTS)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_file_loads(self):
self._run_pubsub_bq_pipeline(
WriteToBigQuery.Method.FILE_LOADS, triggering_frequency=20)
@@ -1362,7 +1355,7 @@
_LOGGER.info(
'Created dataset %s in project %s', self.dataset_id, self.project)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_avro_file_load(self):
# Construct elements such that they can be written via Avro but not via
# JSON. See BEAM-8841.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index c8285a1..c7f1d442 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -30,8 +30,8 @@
import hamcrest as hc
import mock
+import pytest
import pytz
-from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
@@ -105,7 +105,7 @@
projectId=self.project, datasetId=self.dataset_id, table=table)
self.bigquery_client.client.tables.Insert(request)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_big_query_write(self):
table_name = 'python_write_table'
table_id = '{}.{}'.format(self.dataset_id, table_name)
@@ -164,7 +164,7 @@
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_big_query_write_schema_autodetect(self):
if self.runner_name == 'TestDataflowRunner':
self.skipTest('DataflowRunner does not support schema autodetection')
@@ -209,7 +209,7 @@
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
temp_file_format=FileFormat.JSON))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_big_query_write_new_types(self):
table_name = 'python_new_types_table'
table_id = '{}.{}'.format(self.dataset_id, table_name)
@@ -290,7 +290,7 @@
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_big_query_write_without_schema(self):
table_name = 'python_no_schema_table'
self.create_table(table_name)
@@ -352,7 +352,7 @@
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
temp_file_format=FileFormat.JSON))
- @attr('IT')
+ @pytest.mark.it_postcommit
@mock.patch(
"apache_beam.io.gcp.bigquery_file_loads._MAXIMUM_SOURCE_URIS", new=1)
def test_big_query_write_temp_table_append_schema_update(self):
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py
index f5b68a6..abecd5b 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py
@@ -32,8 +32,8 @@
import unittest
from datetime import datetime
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher
from apache_beam.testing.test_pipeline import TestPipeline
@@ -66,7 +66,7 @@
datastore_write_it_pipeline.run(
test_pipeline.get_full_options_as_args(**extra_opts))
- @attr('IT')
+ @pytest.mark.it_postcommit
@unittest.skipIf(
datastore_write_it_pipeline is None, 'GCP dependencies are not installed')
def test_datastore_write_limit(self):
diff --git a/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py b/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
index 16e976c..7970dd5 100644
--- a/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
@@ -29,7 +29,7 @@
import string
import unittest
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam.io import fileio
@@ -133,7 +133,7 @@
# clean up the temp Dicom store
delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_dicom_search_instances(self):
# Search and compare the metadata of a persistent DICOM store.
# Both refine and comprehensive search will be tested.
@@ -183,7 +183,7 @@
equal_to([expected_dict_refine]),
label='refine search assert')
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_dicom_store_instance_from_gcs(self):
# Store DICOM files to a empty DICOM store from a GCS bucket,
# then check if the store metadata match.
diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
index d87e7be..0272dac 100644
--- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_it_test.py
@@ -21,7 +21,7 @@
import unittest
import uuid
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
@@ -101,7 +101,7 @@
cls._add_dummy_entries()
_LOGGER.info("Spanner Read IT Setup Complete...")
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_read_via_table(self):
_LOGGER.info("Spanner Read via table")
with beam.Pipeline(argv=self.args) as p:
@@ -113,7 +113,7 @@
columns=["UserId", "Key"])
assert_that(r, equal_to(self._data))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_read_via_sql(self):
_LOGGER.info("Running Spanner via sql")
with beam.Pipeline(argv=self.args) as p:
diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
index 1701cbb..7f2c8e3 100644
--- a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_it_test.py
@@ -20,7 +20,7 @@
import unittest
import uuid
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
@@ -103,7 +103,7 @@
cls._create_database()
_LOGGER.info('Spanner Write IT Setup Complete...')
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_write_batches(self):
_prefex = 'test_write_batches'
mutations = [
@@ -129,7 +129,7 @@
res.wait_until_finish()
self.assertEqual(self._count_data(_prefex), len(mutations))
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_spanner_update(self):
_prefex = 'test_update'
@@ -165,7 +165,7 @@
res.wait_until_finish()
self.assertEqual(self._count_data(_prefex), 2)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_spanner_error(self):
mutations_update = [
WriteMutation.update(
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
index 5aedb6c..b06e374 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_integration_test.py
@@ -43,7 +43,7 @@
import unittest
import uuid
-from nose.plugins.attrib import attr
+import pytest
from apache_beam.io.filesystems import FileSystems
from apache_beam.testing.test_pipeline import TestPipeline
@@ -111,17 +111,17 @@
self.gcsio.copy(src, dst, kms_key_name, **extra_kwargs)
self._verify_copy(src, dst, kms_key_name)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_copy(self):
self._test_copy("test_copy")
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_copy_kms(self):
if self.kms_key_name is None:
raise unittest.SkipTest('--kms_key_name not specified')
self._test_copy("test_copy_kms", self.kms_key_name)
- @attr('IT')
+ @pytest.mark.it_postcommit
@unittest.skip('BEAM-12352: enable once maxBytesRewrittenPerCall works again')
def test_copy_rewrite_token(self):
# Tests a multi-part copy (rewrite) operation. This is triggered by a
@@ -165,17 +165,17 @@
for _src, _dst in src_dst_pairs:
self._verify_copy(_src, _dst, kms_key_name)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_copy_batch(self):
self._test_copy_batch("test_copy_batch")
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_copy_batch_kms(self):
if self.kms_key_name is None:
raise unittest.SkipTest('--kms_key_name not specified')
self._test_copy_batch("test_copy_batch_kms", self.kms_key_name)
- @attr('IT')
+ @pytest.mark.it_postcommit
@unittest.skip('BEAM-12352: enable once maxBytesRewrittenPerCall works again')
def test_copy_batch_rewrite_token(self):
# Tests a multi-part copy (rewrite) operation. This is triggered by a
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 7e20be3..541bb52 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -24,8 +24,8 @@
import unittest
import uuid
+import pytest
from hamcrest.core.core.allof import all_of
-from nose.plugins.attrib import attr
from apache_beam.io.gcp import pubsub_it_pipeline
from apache_beam.io.gcp.pubsub import PubsubMessage
@@ -204,11 +204,11 @@
id_label=self.ID_LABEL,
timestamp_attribute=self.TIMESTAMP_ATTRIBUTE)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_streaming_data_only(self):
self._test_streaming(with_attributes=False)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_streaming_with_attributes(self):
self._test_streaming(with_attributes=True)
diff --git a/sdks/python/apache_beam/io/parquetio_it_test.py b/sdks/python/apache_beam/io/parquetio_it_test.py
index ee4e0f0..0d3cd0d 100644
--- a/sdks/python/apache_beam/io/parquetio_it_test.py
+++ b/sdks/python/apache_beam/io/parquetio_it_test.py
@@ -21,7 +21,7 @@
import unittest
from collections import Counter
-from nose.plugins.attrib import attr
+import pytest
from apache_beam import Create
from apache_beam import DoFn
@@ -52,7 +52,7 @@
def tearDown(self):
pass
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_parquetio_it(self):
file_prefix = "parquet_it_test"
init_size = 10
diff --git a/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py b/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py
index 4ada679..a699aaa 100644
--- a/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py
+++ b/sdks/python/apache_beam/ml/gcp/cloud_dlp_it_test.py
@@ -20,7 +20,7 @@
import logging
import unittest
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
@@ -66,7 +66,7 @@
self.runner_name = type(self.test_pipeline.runner).__name__
self.project = self.test_pipeline.get_option('project')
- @attr("IT")
+ @pytest.mark.it_postcommit
def test_deidentification(self):
with TestPipeline(is_integration_test=True) as p:
output = (
@@ -77,7 +77,7 @@
inspection_config=INSPECT_CONFIG))
assert_that(output, equal_to(['####################']))
- @attr("IT")
+ @pytest.mark.it_postcommit
def test_inspection(self):
with TestPipeline(is_integration_test=True) as p:
output = (
diff --git a/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py b/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py
index 932bc685..9adf56a 100644
--- a/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/naturallanguageml_test_it.py
@@ -18,7 +18,7 @@
import unittest
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
@@ -46,7 +46,7 @@
])
-@attr('IT')
+@pytest.mark.it_postcommit
@unittest.skipIf(AnnotateText is None, 'GCP dependencies are not installed')
class NaturalLanguageMlTestIT(unittest.TestCase):
def test_analyzing_syntax(self):
diff --git a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
index 8c94a29..535a29f 100644
--- a/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/recommendations_ai_test_it.py
@@ -22,7 +22,7 @@
import random
import unittest
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
@@ -53,7 +53,7 @@
yield response[0]["results"]
-@attr('IT')
+@pytest.mark.it_postcommit
@unittest.skipIf(
recommendationengine is None,
"Recommendations AI dependencies not installed.")
diff --git a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py
index d934520..03f79d1 100644
--- a/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/videointelligenceml_test_it.py
@@ -22,7 +22,7 @@
import unittest
import hamcrest as hc
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
@@ -45,7 +45,7 @@
yield segment.entity.description
-@attr('IT')
+@pytest.mark.it_postcommit
@unittest.skipIf(
AnnotateVideoWithContext is None, 'GCP dependencies are not installed')
class VideoIntelligenceMlTestIT(unittest.TestCase):
diff --git a/sdks/python/apache_beam/ml/gcp/visionml_test_it.py b/sdks/python/apache_beam/ml/gcp/visionml_test_it.py
index 14af3cb8..4413266 100644
--- a/sdks/python/apache_beam/ml/gcp/visionml_test_it.py
+++ b/sdks/python/apache_beam/ml/gcp/visionml_test_it.py
@@ -18,7 +18,7 @@
import unittest
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
@@ -40,7 +40,7 @@
yield text_annotation.description
-@attr('IT')
+@pytest.mark.it_postcommit
@unittest.skipIf(vision is None, 'GCP dependencies are not installed')
class VisionMlTestIT(unittest.TestCase):
def test_text_detection_with_language_hint(self):
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 69362a4..d0aaa2f 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -61,6 +61,7 @@
from typing import FrozenSet
from typing import Iterable
from typing import List
+from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Set
@@ -271,7 +272,7 @@
output_replacements = {
} # type: Dict[AppliedPTransform, List[Tuple[pvalue.PValue, Optional[str]]]]
input_replacements = {
- } # type: Dict[AppliedPTransform, Sequence[Union[pvalue.PBegin, pvalue.PCollection]]]
+ } # type: Dict[AppliedPTransform, Mapping[str, Union[pvalue.PBegin, pvalue.PCollection]]]
side_input_replacements = {
} # type: Dict[AppliedPTransform, List[pvalue.AsSideInput]]
@@ -297,7 +298,7 @@
original_transform_node.parent,
replacement_transform,
original_transform_node.full_label,
- original_transform_node.inputs)
+ original_transform_node.main_inputs)
replacement_transform_node.resource_hints = (
original_transform_node.resource_hints)
@@ -437,11 +438,11 @@
output_replacements[transform_node].append((tag, replacement))
if replace_input:
- new_input = [
- input if not input in output_map else output_map[input]
- for input in transform_node.inputs
- ]
- input_replacements[transform_node] = new_input
+ new_inputs = {
+ tag: input if not input in output_map else output_map[input]
+ for (tag, input) in transform_node.main_inputs.items()
+ }
+ input_replacements[transform_node] = new_inputs
if replace_side_inputs:
new_side_inputs = []
@@ -670,15 +671,18 @@
pvalueish, inputs = transform._extract_input_pvalues(pvalueish)
try:
- inputs = tuple(inputs)
- for leaf_input in inputs:
- if not isinstance(leaf_input, pvalue.PValue):
- raise TypeError
+ if not isinstance(inputs, dict):
+ inputs = {str(ix): input for (ix, input) in enumerate(inputs)}
except TypeError:
raise NotImplementedError(
'Unable to extract PValue inputs from %s; either %s does not accept '
'inputs of this format, or it does not properly override '
'_extract_input_pvalues' % (pvalueish, transform))
+ for t, leaf_input in inputs.items():
+ if not isinstance(leaf_input, pvalue.PValue) or not isinstance(t, str):
+ raise NotImplementedError(
+ '%s does not properly override _extract_input_pvalues, '
+ 'returned %s from %s' % (transform, inputs, pvalueish))
current = AppliedPTransform(
self._current_transform(), transform, full_label, inputs)
@@ -705,7 +709,8 @@
if result.producer is None:
result.producer = current
- self._infer_result_type(transform, inputs, result)
+ # TODO(BEAM-1833): Pass full tuples dict.
+ self._infer_result_type(transform, tuple(inputs.values()), result)
assert isinstance(result.producer.inputs, tuple)
# The DoOutputsTuple adds the PCollection to the outputs when accessed
@@ -940,7 +945,7 @@
for id in proto.components.transforms:
transform = context.transforms.get_by_id(id)
if not transform.inputs and transform.transform.__class__ in has_pbegin:
- transform.inputs = (pvalue.PBegin(p), )
+ transform.main_inputs = {'None': pvalue.PBegin(p)}
if return_context:
return p, context # type: ignore # too complicated for now
@@ -1030,7 +1035,7 @@
parent, # type: Optional[AppliedPTransform]
transform, # type: Optional[ptransform.PTransform]
full_label, # type: str
- inputs, # type: Optional[Sequence[Union[pvalue.PBegin, pvalue.PCollection]]]
+ main_inputs, # type: Optional[Mapping[str, Union[pvalue.PBegin, pvalue.PCollection]]]
environment_id=None, # type: Optional[str]
annotations=None, # type: Optional[Dict[str, bytes]]
):
@@ -1043,7 +1048,7 @@
# reusing PTransform instances in different contexts (apply() calls) without
# any interference. This is particularly useful for composite transforms.
self.full_label = full_label
- self.inputs = inputs or ()
+ self.main_inputs = dict(main_inputs or {})
self.side_inputs = tuple() if transform is None else transform.side_inputs
self.outputs = {} # type: Dict[Union[str, int, None], pvalue.PValue]
@@ -1076,6 +1081,10 @@
}
self.annotations = annotations
+ @property
+ def inputs(self):
+ return tuple(self.main_inputs.values())
+
def __repr__(self):
# type: () -> str
return "%s(%s, %s)" % (
@@ -1109,8 +1118,8 @@
if isinstance(self.transform, external.ExternalTransform):
self.transform.replace_named_outputs(self.named_outputs())
- def replace_inputs(self, inputs):
- self.inputs = inputs
+ def replace_inputs(self, main_inputs):
+ self.main_inputs = main_inputs
# Importing locally to prevent circular dependency issues.
from apache_beam.transforms import external
@@ -1215,12 +1224,11 @@
def named_inputs(self):
# type: () -> Dict[str, pvalue.PValue]
- # TODO(BEAM-1833): Push names up into the sdk construction.
if self.transform is None:
- assert not self.inputs and not self.side_inputs
+ assert not self.main_inputs and not self.side_inputs
return {}
else:
- return self.transform._named_inputs(self.inputs, self.side_inputs)
+ return self.transform._named_inputs(self.main_inputs, self.side_inputs)
def named_outputs(self):
# type: () -> Dict[str, pvalue.PCollection]
@@ -1309,10 +1317,10 @@
pardo_payload = None
side_input_tags = []
- main_inputs = [
- context.pcollections.get_by_id(id) for tag,
- id in proto.inputs.items() if tag not in side_input_tags
- ]
+ main_inputs = {
+ tag: context.pcollections.get_by_id(id)
+ for (tag, id) in proto.inputs.items() if tag not in side_input_tags
+ }
transform = ptransform.PTransform.from_runner_api(proto, context)
if transform and proto.environment_id:
@@ -1334,7 +1342,7 @@
parent=None,
transform=transform,
full_label=proto.unique_name,
- inputs=main_inputs,
+ main_inputs=main_inputs,
environment_id=None,
annotations=proto.annotations)
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index c2a23fd..731d5e3 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -972,6 +972,24 @@
for transform_id in runner_api_proto.components.transforms:
self.assertRegex(transform_id, r'[a-zA-Z0-9-_]+')
+ def test_input_names(self):
+ class MyPTransform(beam.PTransform):
+ def expand(self, pcolls):
+ return pcolls.values() | beam.Flatten()
+
+ p = beam.Pipeline()
+ input_names = set('ABC')
+ inputs = {x: p | x >> beam.Create([x]) for x in input_names}
+ inputs | MyPTransform() # pylint: disable=expression-not-assigned
+ runner_api_proto = Pipeline.to_runner_api(p)
+
+ for transform_proto in runner_api_proto.components.transforms.values():
+ if transform_proto.unique_name == 'MyPTransform':
+ self.assertEqual(set(transform_proto.inputs.keys()), input_names)
+ break
+ else:
+ self.fail('Unable to find transform.')
+
def test_display_data(self):
class MyParentTransform(beam.PTransform):
def expand(self, p):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
index 0ee0e0f..934dfe5 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_exercise_metrics_pipeline_test.py
@@ -23,7 +23,6 @@
import unittest
import pytest
-from nose.plugins.attrib import attr
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
@@ -43,7 +42,7 @@
p = beam.Pipeline(options=pipeline_options)
return dataflow_exercise_metrics_pipeline.apply_and_run(p)
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_metrics_it(self):
result = self.run_pipeline()
errors = metric_result_matchers.verify_all(
@@ -51,7 +50,7 @@
dataflow_exercise_metrics_pipeline.metric_matchers())
self.assertFalse(errors, str(errors))
- @attr('IT')
+ @pytest.mark.it_postcommit
@pytest.mark.it_validatescontainer
def test_metrics_fnapi_it(self):
result = self.run_pipeline(experiment='beam_fn_api')
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 0ac8c5f..bbaf52c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -305,7 +305,7 @@
parent,
beam.Map(lambda x: (b'', x)),
transform_node.full_label + '/MapToVoidKey%s' % ix,
- (side_input.pvalue, ))
+ {'input': side_input.pvalue})
new_side_input.pvalue.producer = map_to_void_key
map_to_void_key.add_output(new_side_input.pvalue, None)
parent.add_part(map_to_void_key)
@@ -594,15 +594,9 @@
return result
def _maybe_add_unified_worker_missing_options(self, options):
- debug_options = options.view_as(DebugOptions)
- # Streaming is always portable, default to runner v2.
- if options.view_as(StandardOptions).streaming:
- if not debug_options.lookup_experiment('disable_runner_v2'):
- debug_options.add_experiment('beam_fn_api')
- debug_options.add_experiment('use_runner_v2')
- debug_options.add_experiment('use_portable_job_submission')
# set default beam_fn_api experiment if use unified
# worker experiment flag exists, no-op otherwise.
+ debug_options = options.view_as(DebugOptions)
from apache_beam.runners.dataflow.internal import apiclient
if apiclient._use_unified_worker(options):
if not debug_options.lookup_experiment('beam_fn_api'):
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 9c7546d..e7ce71b 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -256,7 +256,6 @@
def test_streaming_create_translation(self):
remote_runner = DataflowRunner()
self.default_properties.append("--streaming")
- self.default_properties.append("--experiments=disable_runner_v2")
with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p:
p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
job_dict = json.loads(str(remote_runner.job))
@@ -349,7 +348,8 @@
pcoll2.element_type = typehints.Any
pcoll3.element_type = typehints.KV[typehints.Any, typehints.Any]
for pcoll in [pcoll1, pcoll2, pcoll3]:
- applied = AppliedPTransform(None, beam.GroupByKey(), "label", [pcoll])
+ applied = AppliedPTransform(
+ None, beam.GroupByKey(), "label", {'pcoll': pcoll})
applied.outputs[None] = PCollection(None)
common.group_by_key_input_visitor().visit_transform(applied)
self.assertEqual(
@@ -368,7 +368,7 @@
for pcoll in [pcoll1, pcoll2]:
with self.assertRaisesRegex(ValueError, err_msg):
common.group_by_key_input_visitor().visit_transform(
- AppliedPTransform(None, beam.GroupByKey(), "label", [pcoll]))
+ AppliedPTransform(None, beam.GroupByKey(), "label", {'in': pcoll}))
def test_group_by_key_input_visitor_for_non_gbk_transforms(self):
p = TestPipeline()
@@ -376,7 +376,7 @@
for transform in [beam.Flatten(), beam.Map(lambda x: x)]:
pcoll.element_type = typehints.Any
common.group_by_key_input_visitor().visit_transform(
- AppliedPTransform(None, transform, "label", [pcoll]))
+ AppliedPTransform(None, transform, "label", {'in': pcoll}))
self.assertEqual(pcoll.element_type, typehints.Any)
def test_flatten_input_with_visitor_with_single_input(self):
@@ -388,11 +388,11 @@
def _test_flatten_input_visitor(self, input_type, output_type, num_inputs):
p = TestPipeline()
- inputs = []
- for _ in range(num_inputs):
+ inputs = {}
+ for ix in range(num_inputs):
input_pcoll = PCollection(p)
input_pcoll.element_type = input_type
- inputs.append(input_pcoll)
+ inputs[str(ix)] = input_pcoll
output_pcoll = PCollection(p)
output_pcoll.element_type = output_type
@@ -400,7 +400,7 @@
flatten.add_output(output_pcoll, None)
DataflowRunner.flatten_input_visitor().visit_transform(flatten)
for _ in range(num_inputs):
- self.assertEqual(inputs[0].element_type, output_type)
+ self.assertEqual(inputs['0'].element_type, output_type)
def test_gbk_then_flatten_input_visitor(self):
p = TestPipeline(
@@ -443,7 +443,7 @@
z: (x, y, z),
beam.pvalue.AsSingleton(pc),
beam.pvalue.AsMultiMap(pc))
- applied_transform = AppliedPTransform(None, transform, "label", [pc])
+ applied_transform = AppliedPTransform(None, transform, "label", {'pc': pc})
DataflowRunner.side_input_visitor(
use_fn_api=True).visit_transform(applied_transform)
self.assertEqual(2, len(applied_transform.side_inputs))
@@ -839,8 +839,7 @@
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
- True,
- ['--enable_streaming_engine', '--experiments=disable_runner_v2'])
+ True, ['--enable_streaming_engine'])
# JRH
with self.assertRaisesRegex(
@@ -848,12 +847,7 @@
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
- True,
- [
- '--enable_streaming_engine',
- '--experiments=beam_fn_api',
- '--experiments=disable_runner_v2'
- ])
+ True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
def test_pack_combiners(self):
class PackableCombines(beam.PTransform):
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
index 456636c..448fca7 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
@@ -685,8 +685,8 @@
def visit_transform(self, transform_node):
if transform_node.inputs:
- input_list = list(transform_node.inputs)
- for i, input_pcoll in enumerate(input_list):
+ main_inputs = dict(transform_node.main_inputs)
+ for tag, input_pcoll in main_inputs.items():
key = self._pin.cache_key(input_pcoll)
# Replace the input pcollection with the cached pcollection (if it
@@ -694,9 +694,9 @@
if key in self._pin._cached_pcoll_read:
# Ignore this pcoll in the final pruned instrumented pipeline.
self._pin._ignored_targets.add(input_pcoll)
- input_list[i] = self._pin._cached_pcoll_read[key]
+ main_inputs[tag] = self._pin._cached_pcoll_read[key]
# Update the transform with its new inputs.
- transform_node.inputs = tuple(input_list)
+ transform_node.main_inputs = main_inputs
v = ReadCacheWireVisitor(self)
pipeline.visit(v)
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index 40d19a7..a3f91c0 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -297,11 +297,11 @@
def visit_transform(self, transform_node):
if transform_node.inputs:
- input_list = list(transform_node.inputs)
- for i in range(len(input_list)):
- if input_list[i] == init_pcoll:
- input_list[i] = cached_init_pcoll
- transform_node.inputs = tuple(input_list)
+ main_inputs = dict(transform_node.main_inputs)
+ for tag in main_inputs.keys():
+ if main_inputs[tag] == init_pcoll:
+ main_inputs[tag] = cached_init_pcoll
+ transform_node.main_inputs = main_inputs
v = TestReadCacheWireVisitor()
p_origin.visit(v)
diff --git a/sdks/python/apache_beam/testing/test_stream.py b/sdks/python/apache_beam/testing/test_stream.py
index b3b4a96..d655a90 100644
--- a/sdks/python/apache_beam/testing/test_stream.py
+++ b/sdks/python/apache_beam/testing/test_stream.py
@@ -207,15 +207,56 @@
return 'ProcessingTimeEvent: <{}>'.format(self.advance_by)
-class WindowedValueHolder:
+class WindowedValueHolderMeta(type):
+ """A metaclass that overrides the isinstance check for WindowedValueHolder.
+
+ Python does a quick test for exact match. If an instance is exactly of
+ type WindowedValueHolder, the overridden isinstance check is omitted.
+ The override is needed because WindowedValueHolder elements encoded then
+ decoded become Row elements.
+ """
+ def __instancecheck__(cls, other):
+ """Checks if a beam.Row typed instance is a WindowedValueHolder.
+ """
+ return (
+ isinstance(other, beam.Row) and hasattr(other, 'windowed_value') and
+ hasattr(other, 'urn') and
+ isinstance(other.windowed_value, WindowedValue) and
+ other.urn == common_urns.coders.ROW.urn)
+
+
+class WindowedValueHolder(beam.Row, metaclass=WindowedValueHolderMeta):
"""A class that holds a WindowedValue.
This is a special class that can be used by the runner that implements the
TestStream as a signal that the underlying value should be unreified to the
specified window.
"""
+ # Register WindowedValueHolder to always use RowCoder.
+ coders.registry.register_coder(WindowedValueHolderMeta, coders.RowCoder)
+
def __init__(self, windowed_value):
- self.windowed_value = windowed_value
+ assert isinstance(windowed_value, WindowedValue), (
+ 'WindowedValueHolder can only hold %s type. Instead, %s is given.') % (
+ WindowedValue, windowed_value)
+ super().__init__(
+ **{
+ 'windowed_value': windowed_value, 'urn': common_urns.coders.ROW.urn
+ })
+
+ @classmethod
+ def from_row(cls, row):
+ """Converts a beam.Row typed instance to WindowedValueHolder.
+ """
+ if isinstance(row, WindowedValueHolder):
+ return WindowedValueHolder(row.windowed_value)
+ assert isinstance(row, beam.Row), 'The given row %s must be a %s type' % (
+ row, beam.Row)
+ assert hasattr(row, 'windowed_value'), (
+ 'The given %s must have a windowed_value attribute.') % row
+ assert isinstance(row.windowed_value, WindowedValue), (
+ 'The windowed_value attribute of %s must be a %s type') % (
+ row, WindowedValue)
class TestStream(PTransform):
diff --git a/sdks/python/apache_beam/testing/test_stream_it_test.py b/sdks/python/apache_beam/testing/test_stream_it_test.py
index 65a6aeb..0e293ed 100644
--- a/sdks/python/apache_beam/testing/test_stream_it_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_it_test.py
@@ -22,7 +22,7 @@
import unittest
from functools import wraps
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam.options.pipeline_options import StandardOptions
@@ -67,7 +67,7 @@
cls.project = cls.test_pipeline.get_option('project')
@supported(['DirectRunner', 'SwitchingDirectRunner'])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_basic_execution(self):
test_stream = (
TestStream().advance_watermark_to(10).add_elements([
@@ -103,7 +103,7 @@
]))
@supported(['DirectRunner', 'SwitchingDirectRunner'])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_multiple_outputs(self):
"""Tests that the TestStream supports emitting to multiple PCollections."""
letters_elements = [
@@ -151,7 +151,7 @@
p.run()
@supported(['DirectRunner', 'SwitchingDirectRunner'])
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_multiple_outputs_with_watermark_advancement(self):
"""Tests that the TestStream can independently control output watermarks."""
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py
index 94445dd..a4580b7 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -332,6 +332,27 @@
('a', timestamp.Timestamp(5), beam.window.IntervalWindow(5, 10)),
]))
+ def test_instance_check_windowed_value_holder(self):
+ windowed_value = WindowedValue(
+ 'a',
+ Timestamp(5), [beam.window.IntervalWindow(5, 10)],
+ PaneInfo(True, True, PaneInfoTiming.ON_TIME, 0, 0))
+ self.assertTrue(
+ isinstance(WindowedValueHolder(windowed_value), WindowedValueHolder))
+ self.assertTrue(
+ isinstance(
+ beam.Row(
+ windowed_value=windowed_value, urn=common_urns.coders.ROW.urn),
+ WindowedValueHolder))
+ self.assertFalse(
+ isinstance(
+ beam.Row(windowed_value=windowed_value), WindowedValueHolder))
+ self.assertFalse(isinstance(windowed_value, WindowedValueHolder))
+ self.assertFalse(
+ isinstance(beam.Row(x=windowed_value), WindowedValueHolder))
+ self.assertFalse(
+ isinstance(beam.Row(windowed_value=1), WindowedValueHolder))
+
def test_gbk_execution_no_triggers(self):
test_stream = (
TestStream().advance_watermark_to(10).add_elements([
diff --git a/sdks/python/apache_beam/transforms/external_it_test.py b/sdks/python/apache_beam/transforms/external_it_test.py
index b1eda0a..e24b70f 100644
--- a/sdks/python/apache_beam/transforms/external_it_test.py
+++ b/sdks/python/apache_beam/transforms/external_it_test.py
@@ -21,7 +21,7 @@
import unittest
-from nose.plugins.attrib import attr
+import pytest
import apache_beam as beam
from apache_beam import Pipeline
@@ -33,7 +33,7 @@
class ExternalTransformIT(unittest.TestCase):
- @attr('IT')
+ @pytest.mark.it_postcommit
def test_job_python_from_python_it(self):
@ptransform.PTransform.register_urn('simple', None)
class SimpleTransform(ptransform.PTransform):
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 788485cd..50f7715 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -50,6 +50,7 @@
from typing import Callable
from typing import Dict
from typing import List
+from typing import Mapping
from typing import Optional
from typing import Sequence
from typing import Tuple
@@ -253,7 +254,7 @@
return self.visit_nested(node)
-def get_named_nested_pvalues(pvalueish):
+def get_named_nested_pvalues(pvalueish, as_inputs=False):
if isinstance(pvalueish, tuple):
# Check to see if it's a named tuple.
fields = getattr(pvalueish, '_fields', None)
@@ -262,16 +263,22 @@
else:
tagged_values = enumerate(pvalueish)
elif isinstance(pvalueish, list):
+ if as_inputs:
+ # Full list treated as a list of value for eager evaluation.
+ yield None, pvalueish
+ return
tagged_values = enumerate(pvalueish)
elif isinstance(pvalueish, dict):
tagged_values = pvalueish.items()
else:
- if isinstance(pvalueish, (pvalue.PValue, pvalue.DoOutputsTuple)):
+ if as_inputs or isinstance(pvalueish,
+ (pvalue.PValue, pvalue.DoOutputsTuple)):
yield None, pvalueish
return
for tag, subvalue in tagged_values:
- for subtag, subsubvalue in get_named_nested_pvalues(subvalue):
+ for subtag, subsubvalue in get_named_nested_pvalues(
+ subvalue, as_inputs=as_inputs):
if subtag is None:
yield tag, subsubvalue
else:
@@ -569,6 +576,8 @@
def __ror__(self, left, label=None):
"""Used to apply this PTransform to non-PValues, e.g., a tuple."""
pvalueish, pvalues = self._extract_input_pvalues(left)
+ if isinstance(pvalues, dict):
+ pvalues = tuple(pvalues.values())
pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)]
if pvalues and not pipelines:
deferred = False
@@ -597,8 +606,7 @@
# pylint: enable=wrong-import-order, wrong-import-position
replacements = {
id(v): p | 'CreatePInput%s' % ix >> Create(v, reshuffle=False)
- for ix,
- v in enumerate(pvalues)
+ for (ix, v) in enumerate(pvalues)
if not isinstance(v, pvalue.PValue) and v is not None
}
pvalueish = _SetInputPValues().visit(pvalueish, replacements)
@@ -628,19 +636,11 @@
if isinstance(pvalueish, pipeline.Pipeline):
pvalueish = pvalue.PBegin(pvalueish)
- def _dict_tuple_leaves(pvalueish):
- if isinstance(pvalueish, tuple):
- for a in pvalueish:
- for p in _dict_tuple_leaves(a):
- yield p
- elif isinstance(pvalueish, dict):
- for a in pvalueish.values():
- for p in _dict_tuple_leaves(a):
- yield p
- else:
- yield pvalueish
-
- return pvalueish, tuple(_dict_tuple_leaves(pvalueish))
+ return pvalueish, {
+ str(tag): value
+ for (tag, value) in get_named_nested_pvalues(
+ pvalueish, as_inputs=True)
+ }
def _pvaluish_from_dict(self, input_dict):
if len(input_dict) == 1:
@@ -648,16 +648,15 @@
else:
return input_dict
- def _named_inputs(self, inputs, side_inputs):
- # type: (Sequence[pvalue.PValue], Sequence[Any]) -> Dict[str, pvalue.PValue]
+ def _named_inputs(self, main_inputs, side_inputs):
+ # type: (Mapping[str, pvalue.PValue], Sequence[Any]) -> Dict[str, pvalue.PValue]
"""Returns the dictionary of named inputs (including side inputs) as they
should be named in the beam proto.
"""
- # TODO(BEAM-1833): Push names up into the sdk construction.
main_inputs = {
- str(ix): input
- for (ix, input) in enumerate(inputs)
+ tag: input
+ for (tag, input) in main_inputs.items()
if isinstance(input, pvalue.PCollection)
}
named_side_inputs = {(SIDE_INPUT_PREFIX + '%s') % ix: si.pvalue
diff --git a/sdks/python/container/Dockerfile b/sdks/python/container/Dockerfile
index 34a35b4..359ca2c 100644
--- a/sdks/python/container/Dockerfile
+++ b/sdks/python/container/Dockerfile
@@ -101,6 +101,7 @@
COPY target/LICENSE /opt/apache/beam/
+COPY target/LICENSE.python /opt/apache/beam/
COPY target/NOTICE /opt/apache/beam/
ADD target/launcher/linux_amd64/boot /opt/apache/beam/
diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt
index 2926ff1..c627b4e 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -52,6 +52,8 @@
google-cloud-datastore==1.15.3
google-cloud-dlp==0.13.0
google-cloud-language==1.3.0
+google-cloud-profiler==3.0.4
+google-cloud-recommendations-ai==0.2.0
google-cloud-spanner==1.13.0
google-cloud-videointelligence==1.13.0
google-cloud-vision==0.42.0
@@ -66,6 +68,7 @@
dataclasses == 0.8 ; python_version=="3.6"
guppy3==3.0.10
mmh3==2.5.1
+orjson==3.5.3
python-dateutil == 2.8.1
requests == 2.24.0
freezegun == 0.3.15
diff --git a/sdks/python/pytest.ini b/sdks/python/pytest.ini
index 2906300..a162b69 100644
--- a/sdks/python/pytest.ini
+++ b/sdks/python/pytest.ini
@@ -29,6 +29,7 @@
markers =
xlang_transforms: collect Cross Language transforms test runs
xlang_sql_expansion_service: collect for Cross Language with SQL expansion service test runs
+ it_postcommit: collect for post-commit integration test runs
it_validatesrunner: collect for ValidatesRunner integration test runs
no_sickbay_streaming: run without sickbay-streaming
no_sickbay_batch: run without sickbay-batch
diff --git a/sdks/python/test-suites/dataflow/common.gradle b/sdks/python/test-suites/dataflow/common.gradle
index 3daf379..7d1cf2b 100644
--- a/sdks/python/test-suites/dataflow/common.gradle
+++ b/sdks/python/test-suites/dataflow/common.gradle
@@ -28,14 +28,7 @@
def runScriptsDir = "${rootDir}/sdks/python/scripts"
-// TODO(BEAM-3713) Remove once nose is removed
// Basic test options for ITs running on Jenkins.
-def basicTestOpts = [
- "--nocapture", // print stdout instantly
- "--processes=8", // run tests in parallel
- "--process-timeout=4500", // timeout of whole command execution
-]
-
def basicPytestOpts = [
"--capture=no", // print stdout instantly
"--timeout=4500", // timeout of whole command execution
@@ -56,22 +49,23 @@
doLast {
// Basic integration tests to run in PreCommit
def precommitTests = streaming ? [
- "apache_beam.examples.streaming_wordcount_it_test:StreamingWordCountIT.test_streaming_wordcount_it",
+ "apache_beam/examples/streaming_wordcount_it_test.py::StreamingWordCountIT::test_streaming_wordcount_it",
] : [
- "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
+ "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
]
def testOpts = [
- "--tests=${precommitTests.join(',')}",
- "--nocapture", // Print stdout instantly
- "--processes=2", // Number of tests running in parallel
- "--process-timeout=1800", // Timeout of whole command execution
+ "${precommitTests.join(' ')}",
+ "--capture=no", // Print stdout instantly
+ "--numprocesses=2", // Number of tests running in parallel
+ "--timeout=1800", // Timeout of whole command execution
]
def argMap = [
"test_opts" : testOpts,
"sdk_location": files(configurations.distTarBall.files).singleFile,
"worker_jar" : dataflowWorkerJar,
- "suite" : "preCommitIT-df${pythonSuffix}"
+ "suite" : "preCommitIT-df${pythonSuffix}",
+ "pytest" : true, // TODO(BEAM-3713): Remove this once nose is removed.
]
if (runnerV2) {
@@ -115,14 +109,16 @@
def dataflowWorkerJar = project(":runners:google-cloud-dataflow-java:worker").shadowJar.archivePath
doLast {
- def testOpts = basicTestOpts + ["--attr=IT"]
-
- def cmdArgs = mapToArgString([
+ def testOpts = basicPytestOpts + ["--numprocesses=8", "--dist=loadfile"]
+ def argMap = [
"test_opts": testOpts,
"sdk_location": files(configurations.distTarBall.files).singleFile,
"worker_jar": dataflowWorkerJar,
- "suite": "postCommitIT-df${pythonVersionSuffix}"
- ])
+ "suite": "postCommitIT-df${pythonVersionSuffix}",
+ "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+ "collect": "it_postcommit"
+ ]
+ def cmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${runScriptsDir}/run_integration_test.sh $cmdArgs"
diff --git a/sdks/python/test-suites/direct/common.gradle b/sdks/python/test-suites/direct/common.gradle
index 42bba4c..50fe59a 100644
--- a/sdks/python/test-suites/direct/common.gradle
+++ b/sdks/python/test-suites/direct/common.gradle
@@ -21,9 +21,11 @@
def runScriptsDir = "${rootDir}/sdks/python/scripts"
// Basic test options for ITs running on Jenkins.
def basicTestOpts = [
- "--nocapture", // print stdout instantly
- "--processes=8", // run tests in parallel
- "--process-timeout=4500", // timeout of whole command execution
+ "--capture=no", // print stdout instantly
+ "--numprocesses=8", // run tests in parallel
+ "--timeout=4500", // timeout of whole command execution
+ "--color=yes", // console color
+ "--log-cli-level=INFO" //log level info
]
task postCommitIT {
@@ -32,25 +34,22 @@
// Run IT tests with TestDirectRunner in batch in Python 3.
doLast {
def batchTests = [
- "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
- "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
- "apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT",
- "apache_beam.io.gcp.bigquery_io_read_it_test",
- "apache_beam.io.gcp.bigquery_read_it_test",
- "apache_beam.io.gcp.bigquery_write_it_test",
- "apache_beam.io.gcp.datastore.v1new.datastore_write_it_test",
- "apache_beam.io.gcp.experimental.spannerio_read_it_test",
- "apache_beam.io.gcp.experimental.spannerio_write_it_test",
+ "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
+ "apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest",
+ "apache_beam/io/gcp/big_query_query_to_table_it_test.py::BigQueryQueryToTableIT",
+ "apache_beam/io/gcp/bigquery_io_read_it_test.py",
+ "apache_beam/io/gcp/bigquery_read_it_test.py",
+ "apache_beam/io/gcp/bigquery_write_it_test.py",
+ "apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py",
+ "apache_beam/io/gcp/experimental/spannerio_read_it_test.py",
+ "apache_beam/io/gcp/experimental/spannerio_write_it_test.py",
]
- def testOpts = [
- "--tests=${batchTests.join(',')}",
- "--nocapture", // Print stdout instantly
- "--processes=8", // run tests in parallel
- "--process-timeout=4500", // timeout of whole command execution
- ]
+ def testOpts = basicTestOpts + ["${batchTests.join(' ')}"]
def argMap = ["runner": "TestDirectRunner",
"test_opts": testOpts,
- "suite": "postCommitIT-direct-py${pythonVersionSuffix}"]
+ "suite": "postCommitIT-direct-py${pythonVersionSuffix}",
+ "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+ ]
def batchCmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
@@ -97,18 +96,20 @@
// Run IT tests with TestDirectRunner in batch.
doLast {
def tests = [
- "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
- "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
- "apache_beam.io.gcp.big_query_query_to_table_it_test:BigQueryQueryToTableIT",
- "apache_beam.io.gcp.bigquery_io_read_it_test",
- "apache_beam.io.gcp.bigquery_read_it_test",
- "apache_beam.io.gcp.bigquery_write_it_test",
- "apache_beam.io.gcp.datastore.v1new.datastore_write_it_test",
+ "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
+ "apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest",
+ "apache_beam/io/gcp/big_query_query_to_table_it_test.py::BigQueryQueryToTableIT",
+ "apache_beam/io/gcp/bigquery_io_read_it_test.py",
+ "apache_beam/io/gcp/bigquery_read_it_test.py",
+ "apache_beam/io/gcp/bigquery_write_it_test.py",
+ "apache_beam/io/gcp/datastore/v1new/datastore_write_it_test.py",
]
- def batchTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
+ def batchTestOpts = basicTestOpts + ["${tests.join(' ')}"]
def argMap = ["runner": "TestDirectRunner",
"test_opts": batchTestOpts,
- "suite": "directRunnerIT-batch"]
+ "suite": "directRunnerIT-batch",
+ "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+ ]
def batchCmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
@@ -119,18 +120,19 @@
// Run IT tests with TestDirectRunner in streaming.
doLast {
def tests = [
- "apache_beam.examples.wordcount_it_test:WordCountIT.test_wordcount_it",
- "apache_beam.io.gcp.pubsub_integration_test:PubSubIntegrationTest",
- "apache_beam.io.gcp.bigquery_test:BigQueryStreamingInsertTransformIntegrationTests\
-.test_multiple_destinations_transform",
- "apache_beam.io.gcp.bigquery_test:PubSubBigQueryIT",
- "apache_beam.io.gcp.bigquery_file_loads_test:BigQueryFileLoadsIT.test_bqfl_streaming",
+ "apache_beam/examples/wordcount_it_test.py::WordCountIT::test_wordcount_it",
+ "apache_beam/io/gcp/pubsub_integration_test.py::PubSubIntegrationTest",
+ "apache_beam/io/gcp/bigquery_test.py::BigQueryStreamingInsertTransformIntegrationTests::test_multiple_destinations_transform",
+ "apache_beam/io/gcp/bigquery_test.py::PubSubBigQueryIT",
+ "apache_beam/io/gcp/bigquery_file_loads_test.py::BigQueryFileLoadsIT::test_bqfl_streaming",
]
- def streamingTestOpts = basicTestOpts + ["--tests=${tests.join(',')}"]
+ def streamingTestOpts = basicTestOpts + ["${tests.join(' ')}"]
def argMap = ["runner": "TestDirectRunner",
"streaming": "true",
"test_opts": streamingTestOpts,
- "suite": "directRunnerIT-streaming"]
+ "suite": "directRunnerIT-streaming",
+ "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
+ ]
def streamingCmdArgs = mapToArgString(argMap)
exec {
executable 'sh'
diff --git a/sdks/python/test-suites/portable/common.gradle b/sdks/python/test-suites/portable/common.gradle
index ecf0822..20665f0 100644
--- a/sdks/python/test-suites/portable/common.gradle
+++ b/sdks/python/test-suites/portable/common.gradle
@@ -201,14 +201,14 @@
doLast {
def tests = [
- "apache_beam.io.gcp.bigquery_read_it_test",
- "apache_beam.io.external.xlang_jdbcio_it_test",
- "apache_beam.io.external.xlang_kafkaio_it_test",
- "apache_beam.io.external.xlang_kinesisio_it_test",
- "apache_beam.io.gcp.tests.xlang_spannerio_it_test",
- "apache_beam.io.external.xlang_debeziumio_it_test",
+ "apache_beam/io/gcp/bigquery_read_it_test.py",
+ "apache_beam/io/external/xlang_jdbcio_it_test.py",
+ "apache_beam/io/external/xlang_kafkaio_it_test.py",
+ "apache_beam/io/external/xlang_kinesisio_it_test.py",
+ "apache_beam/io/gcp/tests/xlang_spannerio_it_test.py",
+ "apache_beam/io/external/xlang_debeziumio_it_test.py",
]
- def testOpts = ["--tests=${tests.join(',')}"]
+ def testOpts = ["${tests.join(' ')}"] + ["--log-cli-level=INFO"]
def pipelineOpts = [
"--runner=FlinkRunner",
"--project=apache-beam-testing",
@@ -220,6 +220,7 @@
"test_opts": testOpts,
"suite": "postCommitIT-flink-py${pythonVersionSuffix}",
"pipeline_opts": pipelineOpts.join(" "),
+ "pytest": true, // TODO(BEAM-3713): Remove this once nose is removed.
])
def kafkaJar = project(":sdks:java:testing:kafka-service:").buildTestKafkaServiceJar.archivePath
exec {
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index 7edcd7a..c4a3c95 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -2434,6 +2434,10 @@
lines = pipeline | beam.io.ReadFromText('gs://some/inputData.txt')
{{< /highlight >}}
+{{< highlight go >}}
+lines := textio.Read(scope, 'gs://some/inputData.txt')
+{{< /highlight >}}
+
### 5.2. Writing output data {#pipeline-io-writing-data}
Write transforms write the data in a `PCollection` to an external data source.
@@ -2449,6 +2453,10 @@
output | beam.io.WriteToText('gs://some/outputData')
{{< /highlight >}}
+{{< highlight go >}}
+textio.Write(scope, 'gs://some/inputData.txt', output)
+{{< /highlight >}}
+
### 5.3. File-based input and output data {#file-based-data}
#### 5.3.1. Reading from multiple locations {#file-based-reading-multiple-locations}
@@ -2468,6 +2476,10 @@
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_pipelineio_read >}}
{{< /highlight >}}
+{{< highlight go >}}
+lines := textio.Read(scope, "path/to/input-*.csv")
+{{< /highlight >}}
+
To read data from disparate sources into a single `PCollection`, read each one
independently and then use the [Flatten](#flatten) transform to create a single
`PCollection`.
@@ -2493,6 +2505,12 @@
{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_pipelineio_write >}}
{{< /highlight >}}
+{{< highlight go >}}
+// The Go SDK textio doesn't support sharding on writes yet.
+// See https://issues.apache.org/jira/browse/BEAM-12664 for ways
+// to contribute a solution.
+{{< /highlight >}}
+
### 5.4. Beam-provided I/O transforms {#provided-io-transforms}
See the [Beam-provided I/O Transforms](/documentation/io/built-in/)
diff --git a/website/www/site/layouts/partials/head.html b/website/www/site/layouts/partials/head.html
index 8030193..1b1aa7c 100644
--- a/website/www/site/layouts/partials/head.html
+++ b/website/www/site/layouts/partials/head.html
@@ -36,7 +36,8 @@
</style>
<script src="/js/bootstrap.min.js"></script>
-<script src="/js/language-switch.js"></script>
+<!-- TODO [BEAM-12644]: Create an asset pipeline so we don't have to version .js files. -->
+<script src="/js/language-switch-v2.js"></script>
<script src="/js/fix-menu.js"></script>
<script src="/js/section-nav.js"></script>
<script src="/js/page-nav.js"></script>
diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html
index 94aa1d1..4491ae1 100644
--- a/website/www/site/layouts/partials/section-menu/en/documentation.html
+++ b/website/www/site/layouts/partials/section-menu/en/documentation.html
@@ -12,338 +12,202 @@
<li><span class="section-nav-list-main-title">Documentation</span></li>
<li><a href="/documentation">Using the Documentation</a></li>
-<li>
- <span class="section-nav-list-title">How-to guides</span>
-
- <ul class="section-nav-list">
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Transform catalog</span>
-
- <ul class="section-nav-list">
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Python</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/transforms/python/overview/">Overview</a></li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Element-wise</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/transforms/python/elementwise/filter/">Filter</a></li>
- <li><a href="/documentation/transforms/python/elementwise/flatmap/">FlatMap</a></li>
- <li><a href="/documentation/transforms/python/elementwise/keys/">Keys</a></li>
- <li><a href="/documentation/transforms/python/elementwise/kvswap/">KvSwap</a></li>
- <li><a href="/documentation/transforms/python/elementwise/map/">Map</a></li>
- <li><a href="/documentation/transforms/python/elementwise/pardo/">ParDo</a></li>
- <li><a href="/documentation/transforms/python/elementwise/partition/">Partition</a></li>
- <li><a href="/documentation/transforms/python/elementwise/regex/">Regex</a></li>
- <li><a href="/documentation/transforms/python/elementwise/reify/">Reify</a></li>
- <li><a href="/documentation/transforms/python/elementwise/tostring/">ToString</a></li>
- <li><a href="/documentation/transforms/python/elementwise/values/">Values</a></li>
- <li><a href="/documentation/transforms/python/elementwise/withtimestamps/">WithTimestamps</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Aggregation</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/transforms/python/aggregation/cogroupbykey/">CoGroupByKey</a></li>
- <li><a href="/documentation/transforms/python/aggregation/combineglobally/">CombineGlobally</a></li>
- <li><a href="/documentation/transforms/python/aggregation/combineperkey/">CombinePerKey</a></li>
- <li><a href="/documentation/transforms/python/aggregation/combinevalues/">CombineValues</a></li>
- <li><a href="/documentation/transforms/python/aggregation/count/">Count</a></li>
- <li><a href="/documentation/transforms/python/aggregation/distinct/">Distinct</a></li>
- <li><a href="/documentation/transforms/python/aggregation/groupbykey/">GroupByKey</a></li>
- <li><a href="/documentation/transforms/python/aggregation/groupby/">GroupBy</a></li>
- <li><a href="/documentation/transforms/python/aggregation/groupintobatches/">GroupIntoBatches</a></li>
- <li><a href="/documentation/transforms/python/aggregation/latest/">Latest</a></li>
- <li><a href="/documentation/transforms/python/aggregation/max/">Max</a></li>
- <li><a href="/documentation/transforms/python/aggregation/min/">Min</a></li>
- <li><a href="/documentation/transforms/python/aggregation/mean/">Mean</a></li>
- <li><a href="/documentation/transforms/python/aggregation/sample/">Sample</a></li>
- <li><a href="/documentation/transforms/python/aggregation/sum/">Sum</a></li>
- <li><a href="/documentation/transforms/python/aggregation/top/">Top</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Other</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/transforms/python/other/create/">Create</a></li>
- <li><a href="/documentation/transforms/python/other/flatten/">Flatten</a></li>
- <li><a href="/documentation/transforms/python/other/reshuffle/">Reshuffle</a></li>
- <li><a href="/documentation/transforms/python/other/windowinto/">WindowInto</a></li>
- </ul>
- </li>
- </ul>
- </li>
-
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Java</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/transforms/java/overview/">Overview</a></li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Element-wise</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/transforms/java/elementwise/filter/">Filter</a></li>
- <li><a href="/documentation/transforms/java/elementwise/flatmapelements/">FlatMapElements</a></li>
- <li><a href="/documentation/transforms/java/elementwise/keys/">Keys</a></li>
- <li><a href="/documentation/transforms/java/elementwise/kvswap/">KvSwap</a></li>
- <li><a href="/documentation/transforms/java/elementwise/mapelements/">MapElements</a></li>
- <li><a href="/documentation/transforms/java/elementwise/pardo/">ParDo</a></li>
- <li><a href="/documentation/transforms/java/elementwise/partition/">Partition</a></li>
- <li><a href="/documentation/transforms/java/elementwise/regex/">Regex</a></li>
- <li><a href="/documentation/transforms/java/elementwise/reify/">Reify</a></li>
- <li><a href="/documentation/transforms/java/elementwise/values/">Values</a></li>
- <li><a href="/documentation/transforms/java/elementwise/tostring/">ToString</a></li>
- <li><a href="/documentation/transforms/java/elementwise/withkeys/">WithKeys</a></li>
- <li><a href="/documentation/transforms/java/elementwise/withtimestamps/">WithTimestamps</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Aggregation</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/transforms/java/aggregation/approximatequantiles/">ApproximateQuantiles</a></li>
- <li><a href="/documentation/transforms/java/aggregation/approximateunique/">ApproximateUnique</a></li>
- <li><a href="/documentation/transforms/java/aggregation/cogroupbykey/">CoGroupByKey</a></li>
- <li><a href="/documentation/transforms/java/aggregation/combine/">Combine</a></li>
- <li><a href="/documentation/transforms/java/aggregation/combinewithcontext/">CombineWithContext</a></li>
- <li><a href="/documentation/transforms/java/aggregation/count/">Count</a></li>
- <li><a href="/documentation/transforms/java/aggregation/distinct/">Distinct</a></li>
- <li><a href="/documentation/transforms/java/aggregation/groupbykey/">GroupByKey</a></li>
- <li><a href="/documentation/transforms/java/aggregation/groupintobatches/">GroupIntoBatches</a></li>
- <li><a href="/documentation/transforms/java/aggregation/hllcount/">HllCount</a></li>
- <li><a href="/documentation/transforms/java/aggregation/latest/">Latest</a></li>
- <li><a href="/documentation/transforms/java/aggregation/max/">Max</a></li>
- <li><a href="/documentation/transforms/java/aggregation/mean/">Mean</a></li>
- <li><a href="/documentation/transforms/java/aggregation/min/">Min</a></li>
- <li><a href="/documentation/transforms/java/aggregation/sample/">Sample</a></li>
- <li><a href="/documentation/transforms/java/aggregation/sum/">Sum</a></li>
- <li><a href="/documentation/transforms/java/aggregation/top/">Top</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Other</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/transforms/java/other/create/">Create</a></li>
- <li><a href="/documentation/transforms/java/other/flatten/">Flatten</a></li>
- <li><a href="/documentation/transforms/java/other/passert/">PAssert</a></li>
- <li><a href="/documentation/transforms/java/other/view/">View</a></li>
- <li><a href="/documentation/transforms/java/other/window/">Window</a></li>
- </ul>
- </li>
- </ul>
- </li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Common pipeline patterns</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/patterns/overview/">Overview</a></li>
- <li><a href="/documentation/patterns/file-processing/">File processing</a></li>
- <li><a href="/documentation/patterns/side-inputs/">Side inputs</a></li>
- <li><a href="/documentation/patterns/pipeline-options/">Pipeline options</a></li>
- <li><a href="/documentation/patterns/custom-io/">Custom I/O</a></li>
- <li><a href="/documentation/patterns/custom-windows/">Custom windows</a></li>
- <li><a href="/documentation/patterns/bigqueryio/">BigQueryIO</a></li>
- <li><a href="/documentation/patterns/ai-platform/">AI Platform</a></li>
- <li><a href="/documentation/patterns/schema/">Schema</a></li>
- <li><a href="/documentation/patterns/bqml/">BigQuery ML</a></li>
- <li><a href="/documentation/patterns/cross-language/">Cross-language transforms</a></li>
- </ul>
- </li>
- </ul>
-</li>
-
-<li>
+<li class="section-nav-item--collapsible">
<span class="section-nav-list-title">Concepts</span>
<ul class="section-nav-list">
<li><a href="/documentation/basics/">Basics of the Beam model</a></li>
<li><a href="/documentation/runtime/model/">How Beam executes a pipeline</a></li>
- <li>
- <span class="section-nav-list-title">Pipeline development lifecycle</span>
+ </ul>
+</li>
+<li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Beam programming guide</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/">Overview</a></li>
+ <li><a href="/documentation/programming-guide/#creating-a-pipeline">Pipelines</a></li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">PCollections</span>
<ul class="section-nav-list">
- <li><a href="/documentation/pipelines/design-your-pipeline/">Design Your Pipeline</a></li>
- <li><a href="/documentation/pipelines/create-your-pipeline/">Create Your Pipeline</a></li>
- <li><a href="/documentation/pipelines/test-your-pipeline/">Test Your Pipeline</a></li>
+ <li><a href="/documentation/programming-guide/#pcollections">Creating a PCollection</a></li>
+ <li><a href="/documentation/programming-guide/#pcollection-characteristics">PCollection characteristics</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Transforms</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/#applying-transforms">Applying transforms</a></li>
+ <li>
+ <span class="section-nav-list-title">Core Beam transforms</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/#pardo">ParDo</a></li>
+ <li><a href="/documentation/programming-guide/#groupbykey">GroupByKey</a></li>
+ <li><a href="/documentation/programming-guide/#cogroupbykey">CoGroupByKey</a></li>
+ <li><a href="/documentation/programming-guide/#combine">Combine</a></li>
+ <li><a href="/documentation/programming-guide/#flatten">Flatten</a></li>
+ <li><a href="/documentation/programming-guide/#partition">Partition</a></li>
+ </ul>
+ </li>
+
+ <li><a href="/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms">Requirements for user code</a></li>
+ <li><a href="/documentation/programming-guide/#side-inputs">Side inputs</a></li>
+ <li><a href="/documentation/programming-guide/#additional-outputs">Additional outputs</a></li>
+ <li><a href="/documentation/programming-guide/#composite-transforms">Composite transforms</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Pipeline I/O</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/#pipeline-io">Using I/O transforms</a></li>
+ <li><a href="/documentation/io/built-in/">Built-in I/O connectors</a></li>
+
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Built-in I/O connector guides</span>
+ <ul class="section-nav-list">
+ <li><a href="/documentation/io/built-in/parquet/">Apache Parquet I/O connector
+ </a></li>
+ <li><a href="/documentation/io/built-in/hadoop/">Hadoop Input/Output Format IO</a></li>
+ <li><a href="/documentation/io/built-in/hcatalog/">HCatalog IO</a></li>
+ <li><a href="/documentation/io/built-in/google-bigquery/">Google BigQuery I/O connector</a></li>
+ <li><a href="/documentation/io/built-in/snowflake/">Snowflake I/O connector</a></li>
+ </ul>
+ </li>
+
+
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Developing new I/O connectors</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/io/developing-io-overview/">Overview: Developing connectors</a></li>
+ <li><a href="/documentation/io/developing-io-java/">Developing connectors (Java)</a></li>
+ <li><a href="/documentation/io/developing-io-python/">Developing connectors (Python)</a></li>
+ </ul>
+ </li>
+
+ <li><a href="/documentation/io/testing/">Testing I/O transforms</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Schemas</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/#what-is-a-schema">What is a schema</a></li>
+ <li><a href="/documentation/programming-guide/#schemas-for-pl-types">Schemas for programming language types</a></li>
+ <li><a href="/documentation/programming-guide/#schema-definition">Schema definition</a></li>
+ <li><a href="/documentation/programming-guide/#logical-types">Logical types</a></li>
+ <li><a href="/documentation/programming-guide/#creating-schemas">Creating schemas</a></li>
+ <li><a href="/documentation/programming-guide/#using-schemas">Using schemas</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Data encoding and type safety</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/#data-encoding-and-type-safety">Data encoding basics</a></li>
+ <li><a href="/documentation/programming-guide/#specifying-coders">Specifying coders</a></li>
+ <li><a href="/documentation/programming-guide/#default-coders-and-the-coderregistry">Default coders and the CoderRegistry</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Windowing</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/#windowing">Windowing basics</a></li>
+ <li><a href="/documentation/programming-guide/#provided-windowing-functions">Provided windowing functions</a></li>
+ <li><a href="/documentation/programming-guide/#setting-your-pcollections-windowing-function">Setting your PCollection’s windowing function</a></li>
+ <li><a href="/documentation/programming-guide/#watermarks-and-late-data">Watermarks and late data</a></li>
+ <li><a href="/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements">Adding timestamps to a PCollection’s elements</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Triggers</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/#triggers">Trigger basics</a></li>
+ <li><a href="/documentation/programming-guide/#event-time-triggers">Event time triggers and the default trigger</a></li>
+ <li><a href="/documentation/programming-guide/#processing-time-triggers">Processing time triggers</a></li>
+ <li><a href="/documentation/programming-guide/#data-driven-triggers">Data-driven triggers</a></li>
+ <li><a href="/documentation/programming-guide/#setting-a-trigger">Setting a trigger</a></li>
+ <li><a href="/documentation/programming-guide/#composite-triggers">Composite triggers</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Metrics</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/#metrics">Metrics basics</a></li>
+ <li><a href="/documentation/programming-guide/#types-of-metrics">Types of metrics</a></li>
+ <li><a href="/documentation/programming-guide/#querying-metrics">Querying metrics</a></li>
+ <li><a href="/documentation/programming-guide/#using-metrics">Using metrics in pipeline</a></li>
+ <li><a href="/documentation/programming-guide/#export-metrics">Export metrics</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">State and Timers</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/#types-of-state">Types of state</a></li>
+ <li><a href="/documentation/programming-guide/#deferred-state-reads">Deferred state reads</a></li>
+ <li><a href="/documentation/programming-guide/#timers">Timers</a></li>
+ <li><a href="/documentation/programming-guide/#garbage-collecting-state">Garbage collecting state</a></li>
+ <li><a href="/documentation/programming-guide/#state-timers-examples">State and timers examples</a></li>
</ul>
</li>
- <li>
- <span class="section-nav-list-title">Beam programming guide</span>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Splittable DoFns</span>
<ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/">Overview</a></li>
- <li><a href="/documentation/programming-guide/#creating-a-pipeline">Pipelines</a></li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">PCollections</span>
+ <li><a href="/documentation/programming-guide/#sdf-basics">Basics</a></li>
+ <li><a href="/documentation/programming-guide/#sizing-and-progress">Sizing and progress</a></li>
+ <li><a href="/documentation/programming-guide/#user-initiated-checkpoint">User-initiated checkpoint</a></li>
+ <li><a href="/documentation/programming-guide/#runner-initiated-split">Runner initiated split</a></li>
+ <li><a href="/documentation/programming-guide/#watermark-estimation">Watermark estimation</a></li>
+ <li><a href="/documentation/programming-guide/#truncating-during-drain">Truncating during drain</a></li>
+ <li><a href="/documentation/programming-guide/#bundle-finalization">Bundle finalization</a></li>
+ </ul>
+ </li>
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#pcollections">Creating a PCollection</a></li>
- <li><a href="/documentation/programming-guide/#pcollection-characteristics">PCollection characteristics</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Transforms</span>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Multi-language Pipelines</span>
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#applying-transforms">Applying transforms</a></li>
- <li>
- <span class="section-nav-list-title">Core Beam transforms</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#pardo">ParDo</a></li>
- <li><a href="/documentation/programming-guide/#groupbykey">GroupByKey</a></li>
- <li><a href="/documentation/programming-guide/#cogroupbykey">CoGroupByKey</a></li>
- <li><a href="/documentation/programming-guide/#combine">Combine</a></li>
- <li><a href="/documentation/programming-guide/#flatten">Flatten</a></li>
- <li><a href="/documentation/programming-guide/#partition">Partition</a></li>
- </ul>
- </li>
-
- <li><a href="/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms">Requirements for user code</a></li>
- <li><a href="/documentation/programming-guide/#side-inputs">Side inputs</a></li>
- <li><a href="/documentation/programming-guide/#additional-outputs">Additional outputs</a></li>
- <li><a href="/documentation/programming-guide/#composite-transforms">Composite transforms</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Pipeline I/O</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#pipeline-io">Using I/O transforms</a></li>
- <li><a href="/documentation/io/built-in/">Built-in I/O connectors</a></li>
-
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Built-in I/O connector guides</span>
- <ul class="section-nav-list">
- <li><a href="/documentation/io/built-in/parquet/">Apache Parquet I/O connector
- </a></li>
- <li><a href="/documentation/io/built-in/hadoop/">Hadoop Input/Output Format IO</a></li>
- <li><a href="/documentation/io/built-in/hcatalog/">HCatalog IO</a></li>
- <li><a href="/documentation/io/built-in/google-bigquery/">Google BigQuery I/O connector</a></li>
- <li><a href="/documentation/io/built-in/snowflake/">Snowflake I/O connector</a></li>
- </ul>
- </li>
-
-
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Developing new I/O connectors</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/io/developing-io-overview/">Overview: Developing connectors</a></li>
- <li><a href="/documentation/io/developing-io-java/">Developing connectors (Java)</a></li>
- <li><a href="/documentation/io/developing-io-python/">Developing connectors (Python)</a></li>
- </ul>
- </li>
-
- <li><a href="/documentation/io/testing/">Testing I/O transforms</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Schemas</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#what-is-a-schema">What is a schema</a></li>
- <li><a href="/documentation/programming-guide/#schemas-for-pl-types">Schemas for programming language types</a></li>
- <li><a href="/documentation/programming-guide/#schema-definition">Schema definition</a></li>
- <li><a href="/documentation/programming-guide/#logical-types">Logical types</a></li>
- <li><a href="/documentation/programming-guide/#creating-schemas">Creating schemas</a></li>
- <li><a href="/documentation/programming-guide/#using-schemas">Using schemas</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Data encoding and type safety</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#data-encoding-and-type-safety">Data encoding basics</a></li>
- <li><a href="/documentation/programming-guide/#specifying-coders">Specifying coders</a></li>
- <li><a href="/documentation/programming-guide/#default-coders-and-the-coderregistry">Default coders and the CoderRegistry</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Windowing</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#windowing">Windowing basics</a></li>
- <li><a href="/documentation/programming-guide/#provided-windowing-functions">Provided windowing functions</a></li>
- <li><a href="/documentation/programming-guide/#setting-your-pcollections-windowing-function">Setting your PCollection’s windowing function</a></li>
- <li><a href="/documentation/programming-guide/#watermarks-and-late-data">Watermarks and late data</a></li>
- <li><a href="/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements">Adding timestamps to a PCollection’s elements</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Triggers</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#triggers">Trigger basics</a></li>
- <li><a href="/documentation/programming-guide/#event-time-triggers">Event time triggers and the default trigger</a></li>
- <li><a href="/documentation/programming-guide/#processing-time-triggers">Processing time triggers</a></li>
- <li><a href="/documentation/programming-guide/#data-driven-triggers">Data-driven triggers</a></li>
- <li><a href="/documentation/programming-guide/#setting-a-trigger">Setting a trigger</a></li>
- <li><a href="/documentation/programming-guide/#composite-triggers">Composite triggers</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Metrics</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#metrics">Metrics basics</a></li>
- <li><a href="/documentation/programming-guide/#types-of-metrics">Types of metrics</a></li>
- <li><a href="/documentation/programming-guide/#querying-metrics">Querying metrics</a></li>
- <li><a href="/documentation/programming-guide/#using-metrics">Using metrics in pipeline</a></li>
- <li><a href="/documentation/programming-guide/#export-metrics">Export metrics</a></li>
- </ul>
- </li>
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">State and Timers</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#types-of-state">Types of state</a></li>
- <li><a href="/documentation/programming-guide/#deferred-state-reads">Deferred state reads</a></li>
- <li><a href="/documentation/programming-guide/#timers">Timers</a></li>
- <li><a href="/documentation/programming-guide/#garbage-collecting-state">Garbage collecting state</a></li>
- <li><a href="/documentation/programming-guide/#state-timers-examples">State and timers examples</a></li>
- </ul>
- </li>
-
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Splittable DoFns</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#sdf-basics">Basics</a></li>
- <li><a href="/documentation/programming-guide/#sizing-and-progress">Sizing and progress</a></li>
- <li><a href="/documentation/programming-guide/#user-initiated-checkpoint">User-initiated checkpoint</a></li>
- <li><a href="/documentation/programming-guide/#runner-initiated-split">Runner initiated split</a></li>
- <li><a href="/documentation/programming-guide/#watermark-estimation">Watermark estimation</a></li>
- <li><a href="/documentation/programming-guide/#truncating-during-drain">Truncating during drain</a></li>
- <li><a href="/documentation/programming-guide/#bundle-finalization">Bundle finalization</a></li>
- </ul>
- </li>
-
- <li class="section-nav-item--collapsible">
- <span class="section-nav-list-title">Multi-language Pipelines</span>
-
- <ul class="section-nav-list">
- <li><a href="/documentation/programming-guide/#create-x-lang-transforms">Creating cross-language transforms</a></li>
- <li><a href="/documentation/programming-guide/#use-x-lang-transforms">Using cross-language transforms</a></li>
- <li><a href="/documentation/programming-guide/#x-lang-transform-runner-support">Runner Support</a></li>
- </ul>
- </li>
+ <ul class="section-nav-list">
+ <li><a href="/documentation/programming-guide/#create-x-lang-transforms">Creating cross-language transforms</a></li>
+ <li><a href="/documentation/programming-guide/#use-x-lang-transforms">Using cross-language transforms</a></li>
+ <li><a href="/documentation/programming-guide/#x-lang-transform-runner-support">Runner Support</a></li>
</ul>
</li>
</ul>
</li>
+<li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Pipeline development lifecycle</span>
+ <ul class="section-nav-list">
+ <li><a href="/documentation/pipelines/design-your-pipeline/">Design Your Pipeline</a></li>
+ <li><a href="/documentation/pipelines/create-your-pipeline/">Create Your Pipeline</a></li>
+ <li><a href="/documentation/pipelines/test-your-pipeline/">Test Your Pipeline</a></li>
+ </ul>
+</li>
+<li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Common pipeline patterns</span>
-<li><a href="/documentation/glossary/">Glossary</a></li>
-
+ <ul class="section-nav-list">
+ <li><a href="/documentation/patterns/overview/">Overview</a></li>
+ <li><a href="/documentation/patterns/file-processing/">File processing</a></li>
+ <li><a href="/documentation/patterns/side-inputs/">Side inputs</a></li>
+ <li><a href="/documentation/patterns/pipeline-options/">Pipeline options</a></li>
+ <li><a href="/documentation/patterns/custom-io/">Custom I/O</a></li>
+ <li><a href="/documentation/patterns/custom-windows/">Custom windows</a></li>
+ <li><a href="/documentation/patterns/bigqueryio/">BigQueryIO</a></li>
+ <li><a href="/documentation/patterns/ai-platform/">AI Platform</a></li>
+ <li><a href="/documentation/patterns/schema/">Schema</a></li>
+ <li><a href="/documentation/patterns/bqml/">BigQuery ML</a></li>
+ <li><a href="/documentation/patterns/cross-language/">Cross-language transforms</a></li>
+ </ul>
+</li>
<li class="section-nav-item--collapsible">
<span class="section-nav-list-title">Runtime systems</span>
@@ -352,5 +216,129 @@
<li><a href="/documentation/runtime/sdk-harness-config/">SDK Harness Configuration</a></li>
</ul>
</li>
+<li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Transform catalog</span>
-<li><a href="https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam">Beam Wiki</a></li>
+ <ul class="section-nav-list">
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Python</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/transforms/python/overview/">Overview</a></li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Element-wise</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/transforms/python/elementwise/filter/">Filter</a></li>
+ <li><a href="/documentation/transforms/python/elementwise/flatmap/">FlatMap</a></li>
+ <li><a href="/documentation/transforms/python/elementwise/keys/">Keys</a></li>
+ <li><a href="/documentation/transforms/python/elementwise/kvswap/">KvSwap</a></li>
+ <li><a href="/documentation/transforms/python/elementwise/map/">Map</a></li>
+ <li><a href="/documentation/transforms/python/elementwise/pardo/">ParDo</a></li>
+ <li><a href="/documentation/transforms/python/elementwise/partition/">Partition</a></li>
+ <li><a href="/documentation/transforms/python/elementwise/regex/">Regex</a></li>
+ <li><a href="/documentation/transforms/python/elementwise/reify/">Reify</a></li>
+ <li><a href="/documentation/transforms/python/elementwise/tostring/">ToString</a></li>
+ <li><a href="/documentation/transforms/python/elementwise/values/">Values</a></li>
+ <li><a href="/documentation/transforms/python/elementwise/withtimestamps/">WithTimestamps</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Aggregation</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/transforms/python/aggregation/cogroupbykey/">CoGroupByKey</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/combineglobally/">CombineGlobally</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/combineperkey/">CombinePerKey</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/combinevalues/">CombineValues</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/count/">Count</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/distinct/">Distinct</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/groupbykey/">GroupByKey</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/groupby/">GroupBy</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/groupintobatches/">GroupIntoBatches</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/latest/">Latest</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/max/">Max</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/min/">Min</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/mean/">Mean</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/sample/">Sample</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/sum/">Sum</a></li>
+ <li><a href="/documentation/transforms/python/aggregation/top/">Top</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Other</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/transforms/python/other/create/">Create</a></li>
+ <li><a href="/documentation/transforms/python/other/flatten/">Flatten</a></li>
+ <li><a href="/documentation/transforms/python/other/reshuffle/">Reshuffle</a></li>
+ <li><a href="/documentation/transforms/python/other/windowinto/">WindowInto</a></li>
+ </ul>
+ </li>
+ </ul>
+ </li>
+
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Java</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/transforms/java/overview/">Overview</a></li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Element-wise</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/transforms/java/elementwise/filter/">Filter</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/flatmapelements/">FlatMapElements</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/keys/">Keys</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/kvswap/">KvSwap</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/mapelements/">MapElements</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/pardo/">ParDo</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/partition/">Partition</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/regex/">Regex</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/reify/">Reify</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/values/">Values</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/tostring/">ToString</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/withkeys/">WithKeys</a></li>
+ <li><a href="/documentation/transforms/java/elementwise/withtimestamps/">WithTimestamps</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Aggregation</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/transforms/java/aggregation/approximatequantiles/">ApproximateQuantiles</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/approximateunique/">ApproximateUnique</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/cogroupbykey/">CoGroupByKey</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/combine/">Combine</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/combinewithcontext/">CombineWithContext</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/count/">Count</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/distinct/">Distinct</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/groupbykey/">GroupByKey</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/groupintobatches/">GroupIntoBatches</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/hllcount/">HllCount</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/latest/">Latest</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/max/">Max</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/mean/">Mean</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/min/">Min</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/sample/">Sample</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/sum/">Sum</a></li>
+ <li><a href="/documentation/transforms/java/aggregation/top/">Top</a></li>
+ </ul>
+ </li>
+ <li class="section-nav-item--collapsible">
+ <span class="section-nav-list-title">Other</span>
+
+ <ul class="section-nav-list">
+ <li><a href="/documentation/transforms/java/other/create/">Create</a></li>
+ <li><a href="/documentation/transforms/java/other/flatten/">Flatten</a></li>
+ <li><a href="/documentation/transforms/java/other/passert/">PAssert</a></li>
+ <li><a href="/documentation/transforms/java/other/view/">View</a></li>
+ <li><a href="/documentation/transforms/java/other/window/">Window</a></li>
+ </ul>
+ </li>
+ </ul>
+ </li>
+ </ul>
+</li>
+<li><a href="/documentation/glossary/">Glossary</a></li>
+<li><a href="https://cwiki.apache.org/confluence/display/BEAM/Apache+Beam">Beam Wiki <img src="/images/external-link-icon.png" width="14" height="14" alt="External link."></a></li>
diff --git a/website/www/site/static/js/language-switch.js b/website/www/site/static/js/language-switch-v2.js
similarity index 100%
rename from website/www/site/static/js/language-switch.js
rename to website/www/site/static/js/language-switch-v2.js