Last Updated: Apr 18, 2024
This guide is for Beam users and developers changing and testing Beam code. Specifically, this guide provides information about:
Testing code changes locally
Building Beam artifacts with modified Beam code and using the modified code for pipelines
The Apache Beam GitHub repository (Beam repo) is, for the most part, a “mono repo”: it contains everything in the Beam project, including the SDK, test infrastructure, dashboards, the Beam website, the Beam Playground, and so on.
The Beam repo is a single Gradle project that contains all components, including Python, Go, the website, etc. It is useful to familiarize yourself with the Gradle project structure: https://docs.gradle.org/current/userguide/multi_project_builds.html
Grade uses the following key concepts:
build.gradle filebuild.gradle filebuild.gradle and contains predefined tasks and hierarchiesFor example, common tasks for a Java project or subproject include:
compileJavacompileTestJavatestintegrationTestTo run a Gradle task, the command is ./gradlew -p <project path> <task> or ./gradlew :project:path:task_name. For example:
./gradlew -p sdks/java/core compileJava ./gradlew :sdks:java:harness:test
buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin manages everything.In each java project or subproject, the build.gradle file starts with:
apply plugin: 'org.apache.beam.module' applyJavaNature( ... )
Relevant usage of BeamModulePlugin includes:
applyJavaNature; Python -> applyPythonNature, and so ontest: run Java unit testsspotlessApply: format java codeThe following are example code paths relevant for SDK development:
Java code paths are mainly found in two directories:
sdks/java Java SDK
sdks/java/core Java coresdks/java/harness SDK harness (entrypoint of SDK container)runners Java runner supports. For example,
runners/direct-java Java direct runnerrunners/flink-java Java Flink runnerrunners/google-cloud-dataflow-java Dataflow runner (job submission, translation, etc)runners/google-cloud-dataflow-java/worker Worker on Dataflow legacy runnerFor SDKS in other language, all relevant files are in sdks/LANG, for example,
sdks/python contains the setup file and scripts to trigger test-suites
sdks/python/apache_beam actual beam packagesdks/python/apache_beam/runners/worker SDK worker harness entrypoint, state samplersdks/python/apache_beam/io I/O connectorssdks/python/apache_beam/transforms most “core” componentssdks/python/apache_beam/ml Beam MLsdks/python/apache_beam/runners runner implementations and wrapperssdks/go Go SDK
.github/workflow GitHub action workflows (for example, tests run under PR). Most workflows run a single Gradle command. Check which command is running for a test so that you can run the same command locally during development.
To set up local development environments, first see the Contributing guide . If you plan to use Dataflow, see the Google Cloud documentation to setup gcloud credentials.
To check if your environment is set up, follow these steps:
Depending on the languages involved, your PATH needs to have the following elements configured.
pyenv and a virtual environment to manage Python versions.For example:
sdks/java/io/google-cloud-platform, you need a Java environment.sdks/java/harness, you need a Java environment, a Go environment, and Docker environment. You need the Docker environment to compile and build the Java SDK harness container image.sdks/python/apache_beam, you need a Python environment.This section provides guidance for setting up your Java environment.
From IntelliJ, open /beam (Important: Open the repository root directory, instead of sdks/java).
Wait for indexing. Indexing might take a few minutes.
If the prerequisites are met, the environment set up is complete, because Gradle is a self-contained build tool.
To verify whether the load is successful, find the file examples/java/build.gradle. Next to the wordCount task, a Run button is present. Click Run. The wordCount example compiles and runs.
Note: The IDE is not required for changing the code and testing. You can run tests can by using a Gradle command line, as described in the Console (shell) setup section.
To run tests by using the Gradle command line, in the command-line environment, run the following command:
$ cd beam $ ./gradlew :examples:java:wordCount
When the command completes successfully, the following text appears in the Gradle build log:
... BUILD SUCCESSFUL in 2m 32s 96 actionable tasks: 9 executed, 87 up-to-date 3:41:06 PM: Execution finished 'wordCount'.
In addition, the following text appears in the output file:
$ head /tmp/output.txt* ==> /tmp/output.txt-00000-of-00003 <== should: 38 bites: 1 depraved: 1 gauntlet: 1 battle: 6 sith: 2 cools: 1 natures: 1 hedge: 1 words: 9 ==> /tmp/output.txt-00001-of-00003 <== elements: 1 Advise: 2 fearful: 2 towards: 4 ready: 8 pared: 1 left: 8 safe: 4 canst: 7 warrant: 2 ==> /tmp/output.txt-00002-of-00003 <== chanced: 1 ...
What does this command do?
This command compiles the beam SDK and the WordCount pipeline, a Hello-world program for data processing, then runs the pipeline on the Direct Runner.
This section explains how to run unit tests locally after you make a code change in the Java SDK (for example, in sdks/java/io/jdbc).
Tests are under the src/test/java folder of each project. Unit tests have the filename .../**Test.java. Integration tests have the filename .../**IT.java.
To run all unit tests under a project, use the following command:
./gradlew :sdks:java:harness:test
Find the JUnit report in an HTML file in the file path <invoked_project>/build/reports/tests/test/index.html.
To run a specific test, use the following commands:
./gradlew :sdks:java:harness:test --tests org.apache.beam.fn.harness.CachesTest ./gradlew :sdks:java:harness:test --tests *CachesTest ./gradlew :sdks:java:harness:test --tests *CachesTest.testClearableCache
To run tests using IntelliJ, click the ticks to run a whole test class or a specific test. You can set breakpoints to debug the test.
These steps don‘t apply to sdks:java:core tests. Instead, invoke the unit tests by using :runners:direct-java:needsRunnerTest. Java core doesn’t depend on a runner. Therefore, unit tests that run a pipeline require the Direct Runner.
To run integration tests, use the Direct Runner.
Integration tests use TestPipeline. Set options by using TestPipelineOptions.
Integration tests differ from standard pipelines in the following ways:
beamTestPipelineOptions.Note the final difference, because you need to configure the test by setting -DbeamTestPipelineOptions=[...]. This property is where you set the runner to use.
The following example demonstrates how to run an integration test by using the command line. This example includes the options required to run the pipeline on the Dataflow runner:
-DbeamTestPipelineOptions='["--runner=TestDataflowRunner","--project=mygcpproject","--region=us-central1","--stagingLocation=gs://mygcsbucket/path"]'
To set up a TestPipeline object in an integration test, use the following code:
@Rule public TestPipeline pipelineWrite = TestPipeline.create(); @Test public void testSomething() { pipeline.apply(...); pipeline.run().waitUntilFinish(); }
The task that runs the test needs to specify the runner. The following examples demonstrate how to specify the runner:
:sdks:java:io:google-cloud-platform:integrationTest:runners:google-cloud-dataflow-java:googleCloudPlatformLegacyWorkerIntegrationTest:runners:google-cloud-dataflow-java:googleCloudPlatformRunnerV2IntegrationTestTo see how to run your workflow locally, refer to the Gradle command that the GitHub Action workflow runs.
Example invocation:
./gradlew :runners:google-cloud-dataflow-java:examplesJavaRunnerV2IntegrationTest \ -PdisableSpotlessCheck=true -PdisableCheckStyle=true -PskipCheckerFramework \ -PgcpProject=<your_gcp_project> -PgcpRegion=us-central1 \ -PgcsTempRoot=gs://<your_gcs_bucket>/tmp
To apple code changes to your pipeline, we recommend that you start with a separate branch.
If you're making a pull request or want to test a change with the dev branch, start from Beam HEAD (master).
If you're making a patch on released Beam (2.xx.0), start from a tag (for example, v2.55.0), then in the Beam repo, compile the project involving the code change with the following command. This example modifies sdks/java/io/kafka.
./gradlew -Ppublishing -p sdks/java/io/kafka publishToMavenLocal
By default, this command publishes the artifact with modified code to the Maven Local repository (~/.m2/repository). The change is picked up when the user pipeline runs.
If your code change is made in a development branch, such as on Beam master or a PR, instead of on a release tag, the artifact is produced under version 2.xx.0-SNAPSHOT. You need to make additional configurations in your pipeline project in order to pick up this dependency. The following examples provide guidance for making configurations in Maven and Gradle.
Follow these steps for Maven projects.
maven-archetype as a template to set up your project (https://beam.apache.org/get-started/quickstart-java/).<repository> <id>Maven-Snapshot</id> <name>maven snapshot repository</name> <url>https://repository.apache.org/content/groups/snapshots/</url> </repository> 3. In the `pom.xml` file, modify the value of `beam.version`: ```xml <properties> <beam.version>2.XX.0-SNAPSHOT</beam.version>
Follow these steps for Gradle projects.
build.gradle file, add the following code:repositories { maven { url "https://repository.apache.org/content/groups/snapshots" } }
2.XX.0-SNAPSHOT.This configuration directs the build system to download Beam nightly builds from the Maven Snapshot Repository. The local build that you edited isn‘t downloaded. You don’t need to build all Beam artifacts locally. If you do need to build all Beam artifacts locally, use the following command for all projects ./gradlew -Ppublishing publishToMavenLocal.
The following situations require additional consideration.
If you're using the standard Dataflow runner (not Runner v2), and the worker harness has changed, do the following:
dataflowWorkerJar:./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
The jar is located in the build output. 2. Use the following command to pass pipelineOption:
--dataflowWorkerJar=/.../beam-runners-google-cloud-dataflow-java-legacy-worker-2.XX.0-SNAPSHOT.jar
If you're using Dataflow Runner v2 and sdks/java/harness or its dependency (like sdks/java/core) have changed, do the following:
./gradlew :sdks:java:container:java8:docker # java8, java11, java17, etc docker tag apache/beam_java8_sdk:2.49.0.dev \ "us.gcr.io/apache-beam-testing/beam_java11_sdk:2.49.0-custom" # change to your container registry docker push "us.gcr.io/apache-beam-testing/beam_java11_sdk:2.49.0-custom"
--experiments=use_runner_v2 \ --sdkContainerImage="us.gcr.io/apache-beam-testing/beam_java11_sdk:2.49.0-custom"
The Beam Python SDK is distributed as a single wheel, which is more straightforward than the Java SDK. Python development is consequently less complicated.
These instructions explain how to configure your console. In this example, the working directory is set to sdks/python.
pyenv. Use the following commands: a. install prerequisites b. curl https://pyenv.run | bash c. pyenv install 3.X (a supported Python version, refer to python_version in project propertypyenv virtualenv 3.X ENV_NAME b. pyenv activate ENV_NAMEapache_beam package in editable mode: pip install -e .[gcp, test]# enable pre-commit (env) $ pip install pre-commit (env) $ pre-commit install # disable pre-commit (env) $ pre-commit uninstall
Note Although the tests can be triggered with a Gradle command, that method sets up a fresh virtualenv and installs dependencies before each run, which takes minutes. Therefore, it's useful to have a persistent virtualenv.
Unit tests have the filename **_test.py. Integration tests have the filename **_it_test.py.
pytest -v apache_beam/io/textio_test.py
pytest -v apache_beam/io/textio_test.py::TextSourceTest
pytest -v apache_beam/io/textio_test.py::TextSourceTest::test_progress
To run an integration test on the Direct Runner, use the following command:
python -m pytest -o log_cli=True -o log_level=Info \ apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \ --test-pipeline-options='--runner=TestDirectRunner’
If you are preparing a PR, add tests paths here for test-suites to run in PostCommit Python.
To run an integration test on the Dataflow Runner, follow these steps:
cd sdks/python pip install build && python -m build –sdist
The tarball file is generated in the sdks/python/sdist/ directory.
--test-pipeline-options parameter to specify the tarball file. Use the location --sdk_location=dist/apache-beam-2.53.0.dev0.tar.gz. The following example shows the complete command:python -m pytest -o log_cli=True -o log_level=Info \ apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \ --test-pipeline-options='--runner=TestDataflowRunner --project=<project> --temp_location=gs://<bucket>/tmp --sdk_location=dist/apache-beam-2.35.0.dev0.tar.gz --region=us-central1’
@pytest.mark.it_postcommit.To build containers for modified SDK code, follow these steps.
./gradlew :sdks:python:container:py39:docker \ -Pdocker-repository-root=<gcr.io/location> -Pdocker-tag=<tag>
--sdk_container_image option.The following example shows a complete command:
python -m pytest -o log_cli=True -o log_level=Info \ apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \ --test-pipeline-options='--runner=TestDataflowRunner --project=<project> --temp_location=gs://<bucket>/tmp --sdk_container_image=us.gcr.io/apache-beam-testing/beam-sdk/beam:dev --region=us-central1’
This section explains how to specify additional test dependencies.
Option 1: Use the --requirements_file options. The following example demonstrates how to use this option:
python -m pytest -o log_cli=True -o log_level=Info \ apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \ --test-pipeline-options='--runner=TestDataflowRunner --project=<project> --temp_location=gs://<bucket>/tmp --sdk_location=us.gcr.io/apache-beam-testing/beam-sdk/beam:dev --region=us-central1 –requirements_file=requirements.txt’
Option 2: If you're using the Dataflow runner, use custom containers.
It is convenient to use the official Beam SDK container image as a base and then apply your changes.
To run your pipeline with modified beam code, follow these steps:
Build the Beam SDK tarball as described previously (under sdks/python, run python -m build –sdist).
Install the Beam SDK in your Python virtual environment with the necessary extensions, for example pip install /path/to/apache-beam.tar.gz[gcp].
Initiate your Python script. To run your pipeline, use a command similar to the following example:
python my_pipeline.py --runner=DataflowRunner --sdk_location=/path/to/apache-beam.tar.gz --project=my_project --region=us-central1 --temp_location=gs://my-bucket/temp ...
Tips for using the Dataflow runner:
The Python worker installs the Apache Beam SDK before processing work items. Therefore, you don‘t usually need to provide a custom worker container. If your Google Cloud VM doesn’t have internet access and transient dependencies are changed from the officially released container images, you do need to provide a custom worker container. In this case, see the section “Build containers for modified SDK code.”
Installing the Beam Python SDK from source can be slow (3.5 minutes for an1-standard-1 machine). As an alternative, if the host machine uses amd64 architecture, you can build a wheel instead of a tarball by using a command similar to ./gradle :sdks:python:bdistPy311linux (for Python 3.11). Pass the built wheel using the --sdk_location option. That installation completes in seconds.
save_main_sessionNameError when running DoFn on remote runner--save_main_session pipeline option to enable it