{{< toc >}}
Testing your pipeline is a particularly important step in developing an effective data processing solution. The indirect nature of the Beam model, in which your user code constructs a pipeline graph to be executed remotely, can make debugging failed runs a non-trivial task. Often it is faster and simpler to perform local unit testing on your pipeline code than to debug a pipeline's remote execution.
Before running your pipeline on the runner of your choice, unit testing your pipeline code locally is often the best way to identify and fix bugs in your pipeline code. Unit testing your pipeline locally also allows you to use your familiar/favorite local debugging tools.
You can use DirectRunner, a local runner helpful for testing and local development.
After you test your pipeline using the DirectRunner
, you can use the runner of your choice to test on a small scale. For example, use the Flink runner with a local or remote Flink cluster.
The Beam SDKs provide a number of ways to unit test your pipeline code, from the lowest to the highest levels. From the lowest to the highest level, these are:
To support unit testing, the Beam SDK for Java provides a number of test classes in the testing package. You can use these tests as references and guides.
To test a transform you've created, you can use the following pattern:
TestPipeline
.Create
transform to create a PCollection
of your input data.Apply
your transform to the input PCollection
and save the resulting output PCollection
.PAssert
and its subclasses to verify that the output PCollection
contains the elements that you expect.{{< paragraph class=“language-java” >}} TestPipeline is a class included in the Beam Java SDK specifically for testing transforms. {{< /paragraph >}} {{< paragraph class=“language-py” >}} TestPipeline is a class included in the Beam Python SDK specifically for testing transforms. {{< /paragraph >}} For tests, use TestPipeline
in place of Pipeline
when you create the pipeline object. Unlike Pipeline.create
, TestPipeline.create
handles setting PipelineOptions
internally.
You create a TestPipeline
as follows:
{{< highlight java >}} Pipeline p = TestPipeline.create(); {{< /highlight >}}
{{< highlight py >}} with TestPipeline as p: ... {{< /highlight >}}
Note: Read about testing unbounded pipelines in Beam in this blog post.
You can use the Create
transform to create a PCollection
out of a standard in-memory collection class, such as Java or Python List
. See Creating a PCollection for more information.
[PAssert](https://beam.apache.org/releases/javadoc/{{< param release_latest >}}/index.html?org/apache/beam/sdk/testing/PAssert.html) is a class included in the Beam Java SDK that is an assertion on the contents of a PCollection
. You can use PAssert
to verify that a PCollection
contains a specific set of expected elements.
For a given PCollection
, you can use PAssert
to verify the contents as follows:
{{< highlight java >}} PCollection output = ...;
// Check whether a PCollection contains some elements in any order. PAssert.that(output) .containsInAnyOrder( “elem1”, “elem3”, “elem2”); {{< /highlight >}}
{{< highlight py >}} from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to
output = ...
assert_that( output, equal_to([“elem1”, “elem3”, “elem2”])) {{< /highlight >}}
{{< paragraph class=“language-java” >}} Any Java code that uses PAssert
must link in JUnit
and Hamcrest
. If you‘re using Maven, you can link in Hamcrest
by adding the following dependency to your project’s pom.xml
file: {{< /paragraph >}}
{{< highlight java >}} org.hamcrest hamcrest-all 1.3 test {{< /highlight >}}
For more information on how these classes work, see the [org.apache.beam.sdk.testing](https://beam.apache.org/releases/javadoc/{{< param release_latest >}}/index.html?org/apache/beam/sdk/testing/package-summary.html) package documentation.
The following code shows a complete test for a composite transform. The test applies the Count
transform to an input PCollection
of String
elements. The test uses the Create
transform to create the input PCollection
from a List<String>
.
{{< highlight java >}} public class CountTest {
// Our static input data, which will make up the initial PCollection. static final String[] WORDS_ARRAY = new String[] { “hi”, “there”, “hi”, “hi”, “sue”, “bob”, “hi”, “sue”, "", "", “ZOW”, “bob”, ""};
static final List WORDS = Arrays.asList(WORDS_ARRAY);
public void testCount() { // Create a test pipeline. Pipeline p = TestPipeline.create();
// Create an input PCollection. PCollection<String> input = p.apply(Create.of(WORDS)); // Apply the Count transform under test. PCollection<KV<String, Long>> output = input.apply(Count.<String>perElement()); // Assert on the results. PAssert.that(output) .containsInAnyOrder( KV.of("hi", 4L), KV.of("there", 1L), KV.of("sue", 2L), KV.of("bob", 2L), KV.of("", 3L), KV.of("ZOW", 1L)); // Run the pipeline. p.run();
} } {{< /highlight >}}
{{< highlight py >}} import unittest import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to
class CountTest(unittest.TestCase):
def test_count(self): # Our static input data, which will make up the initial PCollection. WORDS = [ “hi”, “there”, “hi”, “hi”, “sue”, “bob”, “hi”, “sue”, "", "", “ZOW”, “bob”, "" ] # Create a test pipeline. with TestPipeline() as p:
# Create an input PCollection. input = p | beam.Create(WORDS) # Apply the Count transform under test. output = input | beam.combiners.Count.PerElement() # Assert on the results. assert_that( output, equal_to([ ("hi", 4), ("there", 1), ("sue", 2), ("bob", 2), ("", 3), ("ZOW", 1)])) # The pipeline will run and verify the results.
{{< /highlight >}}
You can use the test classes in the Beam SDKs (such as TestPipeline
and PAssert
in the Beam SDK for Java) to test an entire pipeline end-to-end. Typically, to test an entire pipeline, you do the following:
PCollection
(s).TestPipeline
in place of the standard Pipeline.create
.Read
transform(s), use the Create
transform to create one or more PCollection
s from your static input data.Write
transform(s), use PAssert
to verify that the contents of the final PCollection
s your pipeline produces match the expected values in your static output data.The following example code shows how one might test the WordCount example pipeline. WordCount
usually reads lines from a text file for input data; instead, the test creates a List<String>
containing some text lines and uses a Create
transform to create an initial PCollection
.
WordCount
's final transform (from the composite transform CountWords
) produces a PCollection<String>
of formatted word counts suitable for printing. Rather than write that PCollection
to an output text file, our test pipeline uses PAssert
to verify that the elements of the PCollection
match those of a static String
array containing our expected output data.
{{< highlight java >}} public class WordCountTest {
// Our static input data, which will comprise the initial PCollection. static final String[] WORDS_ARRAY = new String[] { "hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"}; static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); // Our static output data, which is the expected data that the final PCollection must match. static final String[] COUNTS_ARRAY = new String[] { "hi: 5", "there: 1", "sue: 2", "bob: 2"}; // Example test that tests the pipeline's transforms. public void testCountWords() throws Exception { Pipeline p = TestPipeline.create(); // Create a PCollection from the WORDS static input data. PCollection<String> input = p.apply(Create.of(WORDS)); // Run ALL the pipeline's transforms (in this case, the CountWords composite transform). PCollection<String> output = input.apply(new CountWords()); // Assert that the output PCollection matches the COUNTS_ARRAY known static output data. PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); // Run the pipeline. p.run(); }
} {{< /highlight >}}
{{< highlight py >}} import unittest import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that from apache_beam.testing.util import equal_to
class CountWords(beam.PTransform): # CountWords transform omitted for conciseness. # Full transform can be found here - https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_debugging.py
class WordCountTest(unittest.TestCase):
WORDS = [ “hi”, “there”, “hi”, “hi”, “sue”, “bob”, “hi”, “sue”, "", "", “ZOW”, “bob”, "" ]
EXPECTED_COUNTS = [“hi: 5”, “there: 1”, “sue: 2”, “bob: 2”]
def test_count_words(self): with TestPipeline() as p:
# Create a PCollection from the WORDS static input data. input = p | beam.Create(WORDS) # Run ALL the pipeline's transforms (in this case, the CountWords composite transform). output = input | CountWords() # Assert that the output PCollection matches the EXPECTED_COUNTS data. assert_that(output, equal_to(EXPECTED_COUNTS), label='CheckOutput') # The pipeline will run and verify the results.
{{< /highlight >}}