blob: 7da2942bcd6d1ced95d169bcb3c550b57996e136 [file] [view]
---
type: languages
title: "Apache Beam YAML Testing"
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Beam YAML Tests
A robust testing story is an important part of any production setup.
Though the various built-in (and externally provided) transform in a Beam YAML
pipeline can be expected to be well tested, it can be important to have tests
that ensure the pipeline as a whole behaves as expected. This is particularly
true for transforms that contain non-trivial UDF logic.
## Whole pipeline tests
For example, consider the example word count pipeline.
```
pipeline:
transforms:
- type: ReadFromText
name: Read from GCS
config:
path: gs://dataflow-samples/shakespeare/kinglear.txt
- type: MapToFields
name: Split words
config:
language: python
fields:
word:
callable: |
import re
def all_words(row):
return re.findall(r'[a-z]+', row.line.lower())
value: 1
input: Read from GCS
- type: Explode
name: Explode word arrays
config:
fields: [word]
input: Split words
- type: Combine
name: Count words
config:
group_by: [word]
combine:
value: sum
input: Explode word arrays
- type: MapToFields
name: Format output
config:
language: python
fields:
output: "word + ': ' + str(value)"
input: Count words
- type: WriteToText
name: Write to GCS
config:
path: gs://bucket/counts.txt
input: Format output
tests: []
```
To write tests for this pipeline, one creates a `tests` section that enumerates
a number of tests, each of which provide example input and assert the expected
output is produced. An example test might be as follows
```
tests:
- name: MyRegressionTest
mock_outputs:
- name: Read from GCS
elements:
- line: "Nothing can come of nothing"
expected_inputs:
- name: Write to GCS
elements:
- output: 'nothing: 2'
- output: 'can: 1'
- output: 'come: 1'
- output: 'of: 1'
```
The `mock_outputs` section designates that the transform named `Read from GCS`
should produce the single row `{line: "Nothing can come of nothing"}` for the
purposes of this test, and the `expected_inputs` section indicates that the
transform `Write to GCS` should expect to receive exactly the given elements.
Neither the actual Read transform nor Write transform from the original
pipelines are executed when running the test, but all intermediate transforms
are.
This test can then be executed by running
```
python -m apache_beam.yaml.main \
--yaml_pipeline_file=wordcount.yaml \
--tests
```
Alternatively, the a `tests:` block may be placed in a separate file and be
validated by running
```
python -m apache_beam.yaml.main \
--yaml_pipeline_file=wordcount.yaml \
--tests \
--test_suite=test_file.yaml
```
For hermeticity, we require that all inputs (with the exception of
`Create`) that are needed to compute the expected outputs are explicitly mocked;
to explicitly allow a sources to be executed as part of a test their names or
types can be enumerated in an `allowed_sources` attribute of the test
specification.
## Pipeline fragment tests
One can also test a portion of a pipeline using the `mock_inputs` and
`expected_outputs` section of a test, for example
```
tests:
- name: TestSplittingWithPunctuation
mock_inputs:
- name: Split words
elements:
- line: "lots-of-words"
- line: "...and more"
expected_outputs:
- name: Explode
elements:
- word: lots
value: 1
- word: of
value: 1
- word: words
value: 1
- word: and
value: 1
- word: more
value: 1
- name: TestCombineAndFormat
mock_inputs:
- name: Count words
elements:
- word: more
value: 1
- word: and
value: 1
- word: more
value: 1
expected_outputs:
- name: Format output
elements:
- output: "more: 2"
- output: "and: 1"
```
As before, each test only executes the portion of the pipeline between the
mock inputs and expected outputs. Note that the named transform in a
`mock_inputs` specification *is* executed, while the named transform of a
`mock_outputs` specification is not.
Similarly, the named transform of a `expected_inputs` specification is *not*
executed, while the named transform of an `expected_outputs` necessarily is.
## Automatically generating tests.
In an effort to make tests as easy to write and maintain as possible,
Beam YAML provides utilities to compute the expected outputs for your tests.
Running
```
python -m apache_beam.yaml.main \
--yaml_pipeline_file=wordcount.yaml \
--tests \
[--test_suite=...] \
--create_test
```
will create an entirely new test by sampling all the sources and
constructing a test accordingly.
One can also keep tests up to date by running
```
python -m apache_beam.yaml.main \
--yaml_pipeline_file=wordcount.yaml \
--tests \
[--test_suite=...] \
--fix_tests
```
which will update any existing `expected_input` and `expected_output` blocks
of your pipeline to contain the actual values computed during the test.
This can be useful in authoring tests as well--one can simply specify a
nonsensical or empty elements block in the expectation and the `--fix_tests`
flag will populate it for you.
(Of course, it is on any user of these flags to verify that the produced values
are meaningful and as expected.)
## Branching pipelines
For complex, branching pipelines, any number of `mock_inputs` and `mock_outputs`
may be enumerated to provide the input data, and any number of `expected_inputs`
and `expected_outputs` validations may be specified as well.
In both the `mock_outputs` and `expected_outputs` block, multiple outputs can
be disambiguated with the `TransformName.output_name` notation just as when
authoring a yaml pipeline.
```
pipeline:
transforms:
- type: Create
name: Abc
config:
elements: [a, b, ccc]
- type: Create
name: Xyz
config:
elements: [x, y, zzz]
- type: MapToFields
name: Upper
input: [Abc, Xyz]
config:
language: python
fields:
element: element.upper()
- type: Partition
input: Upper
config:
language: python
by: '"big" if len(element) > 1 else "small"'
outputs: ["big", "small"]
- type: MapToFields
name: MaybeHasErrors
input: Abc
config:
language: python
fields:
inverse_size: 1 / len(element)
error_handling:
output: errors
- type: StripErrorMetadata
input: MaybeHasErrors.errors
tests:
- name: MockMultipleInputs
mock_outputs:
- name: Abc
elements: [element: a]
- name: Xyz
elements: [element: z]
expected_outputs:
- name: Upper
elements: [element: A, element: Z]
- name: TestMultipelOuptuts
mock_inputs:
- name: Upper
elements: [element: m, element: nnn]
expected_outputs:
- name: Partition.big
elements: [element: NNN]
- name: Partition.small
elements: [element: M]
- name: TestErrorHandling
mock_outputs:
- name: Abc
elements: [element: 'Aaaa', element: '']
expected_outputs:
- name: MaybeHasErrors
elements: [inverse_size: 0.25]
- name: StripErrorMetadata
elements: [element: '']
```