Examples and design patterns for testing Apache Beam I/O transforms
Note: This guide is still in progress. There is an open issue to finish the guide: BEAM-1025.
This document explains the set of tests that the Beam community recommends based on our past experience writing I/O transforms. If you wish to contribute your I/O transform to the Beam community, we'll ask you to implement these tests.
While it is standard to write unit tests and integration tests, there are many possible definitions. Our definitions are:
We do not advocate writing a separate test specifically for performance benchmarking. Instead, we recommend setting up integration tests that can accept the necessary parameters to cover many different testing scenarios.
For example, if integration tests are written according to the guidelines below, the integration tests can be run on different runners (either local or in a cluster configuration) and against a data store that is a small instance with a small data set, or a large production-ready cluster with larger data set. This can provide coverage for a variety of scenarios - one of them is performance benchmarking.
It's easy to cover a large amount of code with an integration test, but it is then hard to find a cause for test failures and the test is flakier.
However, there is a valuable set of bugs found by tests that exercise multiple workers reading/writing to data store instances that have multiple nodes (eg, read replicas, etc.). Those scenarios are hard to find with unit tests and we find they commonly cause bugs in I/O transforms.
Our test strategy is a balance of those 2 contradictory needs. We recommend doing as much testing as possible in unit tests, and writing a single, small integration test that can be run in various configurations.
Java:
Source
sPython:
source_test_utils
, assert_that
and equal_to
A general guide to writing Unit Tests for all transforms can be found in the [PTransform Style Guide]({{ site.baseurl }}/contribute/ptransform-style-guide/#testing ). We have expanded on a few important points below.
If you are using the Source
API, make sure to exhaustively unit-test your code. A minor implementation error can lead to data corruption or data loss (such as skipping or duplicating records) that can be hard for your users to detect. Also look into using SourceTestUtils
source_test_utils
- it is a key piece of testing Source
implementations.
If you are not using the Source
API, you can use TestPipeline
with PAssert
assert_that
to help with your testing.
If you are implementing write, you can use TestPipeline
to write test data and then read and verify it using a non-Beam client.
Instead of using mocks in your unit tests (pre-programming exact responses to each call for each test), use fakes. The preferred way to use fakes for I/O transform testing is to use a pre-existing in-memory/embeddable version of the service you‘re testing, but if one does not exist consider implementing your own. Fakes have proven to be the right mix of “you can get the conditions for testing you need” and "you don’t have to write a million exacting mock function calls".
To help with testing and separation of concerns, code that interacts across a network should be handled in a separate class from your I/O transform. The suggested design pattern is that your I/O transform throws exceptions once it determines that a read or write is no longer possible.
This allows the I/O transform's unit tests to act as if they have a perfect network connection, and they do not need to retry/otherwise handle network connection problems.
If your I/O transform allows batching of reads/writes, you must force the batching to occur in your test. Having configurable batch size options on your I/O transform allows that to happen easily. These must be marked as test only.
We do not currently have examples of Python I/O integration tests or integration tests for unbounded or eventually consistent data stores. We would welcome contributions in these areas - please contact the Beam dev@ mailing list for more information.
In order to test I/O transforms in real world conditions, you must connect to a data store instance.
The Beam community hosts the data stores used for integration tests in Kubernetes. In order for an integration test to be run in Beam's continuous integration environment, it must have Kubernetes scripts that set up an instance of the data store.
However, when working locally, there is no requirement to use Kubernetes. All of the test infrastructure allows you to pass in connection info, so developers can use their preferred hosting infrastructure for local development.
The high level steps for running an integration test are:
Since setting up data stores and running the tests involves a number of steps, and we wish to time these tests when running performance benchmarks, we use PerfKit Benchmarker to manage the process end to end. With a single command, you can go from an empty Kubernetes cluster to a running integration test.
However, PerfKit Benchmarker is not required for running integration tests. Therefore, we have listed the steps for both using PerfKit Benchmarker, and manually running the tests below.
Prerequisites:
You won’t need to invoke PerfKit Benchmarker directly. Run ./gradlew performanceTest
task in project's root directory, passing kubernetes scripts of your choice (located in .test_infra/kubernetes directory). It will setup PerfKitBenchmarker for you.
Example run with the [Direct]({{ site.baseurl }}/documentation/runners/direct/) runner:
./gradlew performanceTest -DpkbLocation="/Users/me/PerfKitBenchmarker/pkb.py" -DintegrationTestPipelineOptions='["--numberOfRecords=1000"]' -DitModule=sdks/java/io/jdbc/ -DintegrationTest=org.apache.beam.sdk.io.jdbc.JdbcIOIT -DkubernetesScripts="/Users/me/beam/.test-infra/kubernetes/postgres/postgres-service-for-local-dev.yml" -DbeamITOptions="/Users/me/beam/.test-infra/kubernetes/postgres/pkb-config-local.yml" -DintegrationTestRunner=direct
Example run with the [Google Cloud Dataflow]({{ site.baseurl }}/documentation/runners/dataflow/) runner:
./gradlew performanceTest -DpkbLocation="/Users/me/PerfKitBenchmarker/pkb.py" -DintegrationTestPipelineOptions='["--numberOfRecords=1000", "--project=GOOGLE_CLOUD_PROJECT", "--tempRoot=GOOGLE_STORAGE_BUCKET"]' -DitModule=sdks/java/io/jdbc/ -DintegrationTest=org.apache.beam.sdk.io.jdbc.JdbcIOIT -DkubernetesScripts="/Users/me/beam/.test-infra/kubernetes/postgres/postgres-service-for-local-dev.yml" -DbeamITOptions="/Users/me/beam/.test-infra/kubernetes/postgres/pkb-config-local.yml" -DintegrationTestRunner=dataflow
Example run with the HDFS filesystem and Cloud Dataflow runner:
./gradlew performanceTest -DpkbLocation="/Users/me/PerfKitBenchmarker/pkb.py" -DintegrationTestPipelineOptions='["--numberOfRecords=100000", "--project=GOOGLE_CLOUD_PROJECT", "--tempRoot=GOOGLE_STORAGE_BUCKET"]' -DitModule=sdks/java/io/file-based-io-tests/ -DintegrationTest=org.apache.beam.sdk.io.text.TextIOIT -DkubernetesScripts=".test-infra/kubernetes/hadoop/LargeITCluster/hdfs-multi-datanode-cluster.yml,.test-infra/kubernetes/hadoop/LargeITCluster/hdfs-multi-datanode-cluster-for-local-dev.yml" -DbeamITOptions=".test-infra/kubernetes/hadoop/LargeITCluster/pkb-config.yml" -DintegrationTestRunner=dataflow -DbeamExtraProperties='[filesystem=hdfs]'
NOTE: When using Direct runner along with HDFS cluster, please set export HADOOP_USER_NAME=root
before runnning performanceTest
task.
Parameter descriptions:
If you're using Kubernetes scripts to host data stores, make sure you can connect to your cluster locally using kubectl. If you have your own data stores already setup, you just need to execute step 3 from below list.
kubectl create -f [scriptname]
to create the data store.NodePort
service. The NodePort
service opens a port to the data store for anyone who connects to the Kubernetes cluster's machines from within same subnetwork. Such scripts are typically useful when running the scripts on Minikube Kubernetes Engine.kubectl create -f .test-infra/kubernetes/postgres/postgres.yml
bash .test-infra/kubernetes/elasticsearch/setup.sh
kubectl get pods -l 'component=elasticsearch' -o jsonpath={.items[0].status.podIP}
kubectl get svc elasticsearch-external -o jsonpath='{.status.loadBalancer.ingress[0].ip}'
integrationTest
gradle task and the instructions in the test class (e.g. see the instructions in JdbcIOIT.java).kubectl delete -f .test-infra/kubernetes/postgres/postgres.yml
bash .test-infra/kubernetes/elasticsearch/teardown.sh
Since performanceTest
task involved running PerfkitBenchmarker, we can't use it to run the tests manually. For such purposes a more “low-level” task called integrationTest
was introduced.
Example usage on Cloud Dataflow runner:
./gradlew integrationTest -p sdks/java/io/hadoop-format -DintegrationTestPipelineOptions='["--project=GOOGLE_CLOUD_PROJECT", "--tempRoot=GOOGLE_STORAGE_BUCKET", "--numberOfRecords=1000", "--postgresPort=5432", "--postgresServerName=SERVER_NAME", "--postgresUsername=postgres", "--postgresPassword=PASSWORD", "--postgresDatabaseName=postgres", "--postgresSsl=false", "--runner=TestDataflowRunner"]' -DintegrationTestRunner=dataflow --tests=org.apache.beam.sdk.io.hadoop.format.HadoopFormatIOIT
Example usage on HDFS filesystem and Direct runner:
NOTE: Below setup will only work when /etc/hosts file contains entries with hadoop namenode and hadoop datanodes external IPs. Please see explanation in: Small Cluster config file and Large Cluster config file.
export HADOOP_USER_NAME=root ./gradlew integrationTest -p sdks/java/io/file-based-io-tests -DintegrationTestPipelineOptions='["--numberOfRecords=1000", "--filenamePrefix=hdfs://HDFS_NAMENODE:9000/XMLIOIT", "--hdfsConfiguration=[{\"fs.defaultFS\":\"hdfs://HDFS_NAMENODE:9000\",\"dfs.replication\":1,\"dfs.client.use.datanode.hostname\":\"true\" }]" ]' -DintegrationTestRunner=direct -Dfilesystem=hdfs --tests org.apache.beam.sdk.io.xml.XmlIOIT
Parameter descriptions:
Thanks to ghprb plugin it is possible to run Jenkins jobs when specific phrase is typed in a Github Pull Request's comment. Integration tests that have Jenkins job defined can be triggered this way. You can run integration tests using these phrases:
Every job definition can be found in .test-infra/jenkins. If you modified/added new Jenkins job definitions in your Pull Request, run the seed job before running the integration test (comment: “Run seed job”).
We measure the performance of IOITs by gathering test execution times from Jenkins jobs that run periodically. The consequent results are stored in a database (BigQuery), therefore we can display them in a form of plots.
The dashboard gathering all the results is available here: Performance Testing Dashboard
There are three components necessary to implement an integration test:
These three pieces are discussed in detail below.
These are the conventions used by integration testing code:
Elasticsearch
and the HadoopFormatIO
tests), those tests share the same pipeline options.CountingInput
+ TestRow
can be combined to generate deterministic test data at any scale.Test
, run a pipeline to do a write using your I/O transform, then run another pipeline to do a read using your I/O transform.HashingFn
can help make this simple, and TestRow
has pre-computed hashes.PAssert
's containsInAnyOrder
to validate the contents of a subset of all rows.@AfterClass
to ensure it runs.An end to end example of these principles can be found in JdbcIOIT.
As discussed in Integration tests, data stores, and Kubernetes, to have your tests run on Beam‘s continuous integration server, you’ll need to implement a Kubernetes script that creates an instance of your data store.
If you would like help with this or have other questions, contact the Beam dev@ mailing list and the community may be able to assist you.
Guidelines for creating a Beam data store Kubernetes script:
StatefulSet
) plus a NodePort
service exposing the data store. This will be the script run by the Beam Jenkins continuous integration server.LoadBalancer
service, used to expose an external IP address to the data store if the Kubernetes cluster is on another network. This file's name is usually suffixed with ‘-for-local-dev’.pod
directly, it will not be recreated if the pod crashes or something causes the cluster to move the container for your pod.StatefulSet
as it supports persistent disks that last between restarts, and having a stable network identifier associated with the pod using a particular persistent disk. Deployment
and ReplicaSet
are also possibly useful, but likely in fewer scenarios since they do not have those features.To allow developers to easily invoke your I/O integration test, you should create a PerfKit Benchmarker benchmark configuration file for the data store. Each pipeline option needed by the integration test should have a configuration entry. This is to be passed to perfkit via “beamITOptions” option in “performanceTest” task (described above). The goal is that a checked in config has defaults such that other developers can run the test without changing the configuration.
The benchmark configuration file is a yaml file that defines the set of pipeline options for a specific data store. Some of these pipeline options are static - they are known ahead of time, before the data store is created (e.g. username/password). Others options are dynamic - they are only known once the data store is created (or after we query the Kubernetes cluster for current status).
All known cases of dynamic pipeline options are for extracting the IP address that the test needs to connect to. For I/O integration tests, we must allow users to specify:
The style of dynamic pipeline options used here should support a variety of other types of values derived from Kubernetes, but we do not have specific examples.
The dynamic pipeline options are:
A configuration file will look like this:
static_pipeline_options: -postgresUser: postgres -postgresPassword: postgres dynamic_pipeline_options: - paramName: PostgresIp type: NodePortIp podLabel: app=postgres
and may contain the following elements:
In most cases, to run the performanceTest task it is sufficient to pass the properties described above, which makes it easy to use. However, users can customize Perfkit Benchmarker's behavior even more by pasing some extra Gradle properties:
Apache Beam expects that it can run integration tests in multiple configurations:
You can do this by:
An example of this is HadoopFormatIO's tests.