type: languages title: “Java multi-language pipelines quickstart” aliases:
This page provides a high-level overview of creating multi-language pipelines with the Apache Beam SDK for Java. For a more complete discussion of the topic, see Multi-language pipelines.
A multi-language pipeline is a pipeline that’s built in one Beam SDK language and uses one or more transforms from another Beam SDK language. These transforms from another SDK are called cross-language transforms. Multi-language support makes pipeline components easier to share across the Beam SDKs and grows the pool of available transforms for all the SDKs.
In the examples below, the multi-language pipeline is built with the Beam Java SDK, and the cross-language transform is built with the Beam Python SDK.
This quickstart is based on a Java example pipeline, PythonDataframeWordCount, that counts words in a Shakespeare text. If you’d like to run the pipeline, you can clone or download the Beam repository and build the example from the source code.
To build and run the example, you need a Java environment with the Beam Java SDK version 2.40.0 or later installed, and a Python environment. If you don’t already have these environments set up, first complete the Apache Beam Java SDK Quickstart and the Apache Beam Python SDK Quickstart.
The Java example pipeline uses the Python DataframeTransform as a cross-language transform. The transform is part of the Beam Dataframe API for working with pandas-like DataFrame objects.
To apply a cross-language transform, your pipeline must specify it. Python transforms are identified by their fully qualified name. For example, DataframeTransform
can be found in the apache_beam.dataframe.transforms
package, so its fully qualified name is apache_beam.dataframe.transforms.DataframeTransform
. The example pipeline, PythonDataframeWordCount, passes this fully qualified name to PythonExternalTransform.
Note: The example pipeline is intended to demonstrate the development of Java multi-language pipelines that use arbitrary Python cross-language transforms. For production use cases of the Dataframe API in Java, you should use the higher-level DataframeTransform instead.
Here's the complete pipeline definition from the example:
static void runWordCount(WordCountOptions options) { Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())) .apply(ParDo.of(new ExtractWordsFn())) .setRowSchema(ExtractWordsFn.SCHEMA) .apply( PythonExternalTransform.<PCollection<Row>, PCollection<Row>>from( "apache_beam.dataframe.transforms.DataframeTransform", options.getExpansionService()) .withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()")) .withKwarg("include_indexes", true)) .apply(MapElements.via(new FormatAsTextFn())) .apply("WriteCounts", TextIO.write().to(options.getOutput())); p.run().waitUntilFinish(); }
PythonExternalTransform
is a wrapper for invoking external Python transforms. The from
method accepts two strings: 1) the fully qualified transform name; 2) an optional address and port number for the expansion service. The method returns a stub for the Python cross-language transform that can be used directly in a Java pipeline. withKwarg
specifies a keyword argument for instantiating the Python cross-language transform. In this case, withKwarg
is invoked twice, to specify a func
argument and an include_indexes
argument, and these arguments are passed to DataframeTransform
. PythonExternalTransform
also provides other ways to specify args and kwargs for Python cross-language transforms.
To understand how this pipeline works, it’s helpful to look more closely at the first withKwarg
invocation:
.withKwarg("func", PythonCallableSource.of("lambda df: df.groupby('word').sum()"))
The argument to PythonCallableSource.of
is a string representation of a Python lambda function. DataframeTransform
takes as an argument a Python callable to apply to a PCollection
as if it were a Dataframe. The withKwarg
method lets you specify a Python callable in your Java pipeline. To learn more about passing a function to DataframeTransform
, see Embedding DataFrames in a pipeline.
If you want to customize the environment or use transforms not available in the default Beam SDK, you might need to run your own expansion service. In such cases, start the expansion service before running your pipeline.
For this example, you can simply run your multi-language pipeline using Gradle, as shown below.
The following script runs the example multi-language pipeline on Dataflow, using example text from a Cloud Storage bucket. You’ll need to adapt the script to your environment.
export OUTPUT_BUCKET=<bucket> export GCP_REGION=<region> export TEMP_LOCATION=gs://$OUTPUT_BUCKET/tmp export PYTHON_VERSION=<version> ./gradlew :examples:multi-language:pythonDataframeWordCount --args=" \ --runner=DataflowRunner \ --output=gs://${OUTPUT_BUCKET}/count \ --region=${GCP_REGION} \ --experiments=use_runner_v2"
The pipeline outputs a file with the results to gs://$OUTPUT_BUCKET/count-00000-of-00001.
When building a job for a multi-language pipeline, Beam uses an expansion service to expand composite transforms. You must have at least one expansion service per remote SDK.
In the general case, if you have a supported version of Python installed on your system, you can let PythonExternalTransform
handle the details of creating and starting up the expansion service. But if you want to customize the environment or use transforms not available in the default Beam SDK, you might need to run your own expansion service.
For example, to start the standard expansion service for a Python transform, ExpansionServiceServicer, follow these steps:
Activate a Python virtual environment and install Apache Beam, as described in the Python quick start.
In the beam/sdks/python directory of the Beam source code, run the following command:
python apache_beam/runners/portability/expansion_service_main.py -p 18089 --fully_qualified_name_glob "*"
The command runs expansion_service_main.py, which starts the standard expansion service. When you use Gradle to run your Java pipeline, you can specify the expansion service with the expansionService
option. For example: --expansionService=localhost:18089
.
To learn more about Beam support for cross-language pipelines, see Multi-language pipelines. To learn more about the Beam DataFrame API, see Beam DataFrames overview.