Improved multi-language pipelines section of the programming guide (#16587)
* improved multi-language pipelines section of the programming guide
* made changes to multi-lang pipeline content, in response to feedback
* updating Beam version number for supporting no-code Java xlang transforms
* updating one more Beam version number
diff --git a/website/CONTRIBUTE.md b/website/CONTRIBUTE.md
index b2023f4..3a02ceb 100644
--- a/website/CONTRIBUTE.md
+++ b/website/CONTRIBUTE.md
@@ -246,6 +246,7 @@
```
The purpose of adding classes or programming languages (java, py or go) in code highlighting is to activate the language switching feature.
+If you want highlighting for a code block that should *not* be affected by language switching - that is, if you want to a highlight a code block, and you don't want the block to go away when a different language is selected - use ` ``` `. For example: ` ```java // Java code here...``` `.
### Adding class to markdown text
diff --git a/website/www/site/content/en/documentation/programming-guide.md b/website/www/site/content/en/documentation/programming-guide.md
index a1b3dd1..e65762c 100644
--- a/website/www/site/content/en/documentation/programming-guide.md
+++ b/website/www/site/content/en/documentation/programming-guide.md
@@ -6543,23 +6543,24 @@
## 13. Multi-language pipelines {#multi-language-pipelines}
-This section provides comprehensive documentation of multi-language pipelines. For a short overview of the topic, see:
+This section provides comprehensive documentation of multi-language pipelines. To get started creating a multi-language pipeline, see:
* [Python multi-language pipelines quickstart](/documentation/sdks/python-multi-language-pipelines)
+* [Java multi-language pipelines quickstart](/documentation/sdks/java-multi-language-pipelines)
-Beam allows you to combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the [Apache Kafka connector](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py) and [SQL transform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sql.py) from the Java SDK can be used in Python streaming pipelines.
+Beam lets you combine transforms written in any supported SDK language (currently, Java and Python) and use them in one multi-language pipeline. This capability makes it easy to provide new functionality simultaneously in different Apache Beam SDKs through a single cross-language transform. For example, the [Apache Kafka connector](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py) and [SQL transform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/sql.py) from the Java SDK can be used in Python pipelines.
Pipelines that use transforms from more than one SDK-language are known as *multi-language pipelines*.
### 13.1. Creating cross-language transforms {#create-x-lang-transforms}
-To make transforms written in one language available to pipelines written in another language, an *expansion service* for transforms written in the same language is used to create and inject the appropriate language-specific pipeline fragments into your pipeline.
+To make transforms written in one language available to pipelines written in another language, Beam uses an *expansion service*, which creates and injects the appropriate language-specific pipeline fragments into the pipeline.
-In the following example, a Python pipeline written the Apache Beam SDK for Python starts up a local Java expansion service on your computer to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into your Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.
+In the following example, a Beam Python pipeline starts up a local Java expansion service to create and inject the appropriate Java pipeline fragments for executing the Java Kafka cross-language transform into the Python pipeline. The SDK then downloads and stages the necessary Java dependencies needed to execute these transforms.

-At runtime, the Beam runner will execute both Python and Java transforms to execute your pipeline.
+At runtime, the Beam runner will execute both Python and Java transforms to run the pipeline.
In this section, we will use [KafkaIO.Read](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html) to illustrate how to create a cross-language transform for Java and a test example for Python.
@@ -6568,28 +6569,28 @@
There are two ways to make Java transforms available to other SDKs.
* Option 1: In some cases, you can use existing Java transforms from other SDKs without writing any additional Java code.
-* Option 2: You can use arbitrary Java Transforms from other SDKs by adding a few Java classes.
+* Option 2: You can use arbitrary Java transforms from other SDKs by adding a few Java classes.
-##### 13.1.1.1 Using Existing Java Transforms from Other SDKs Without Writing more Java Code
+##### 13.1.1.1 Using existing Java transforms without writing more Java code
-Starting with Beam 2.34.0, Python SDK users can use some Java transforms without writing additional Java code. This can be useful in many cases. For example,
-* A developer not familiar with Java may need to use an existing Java transform from a Python pipeline
-* A developer may need to make an existing Java transform available to a Python pipeline without writing/releasing more Java code
+Starting with Beam 2.34.0, Python SDK users can use some Java transforms without writing additional Java code. This can be useful in many cases. For example:
+* A developer not familiar with Java may need to use an existing Java transform from a Python pipeline.
+* A developer may need to make an existing Java transform available to a Python pipeline without writing/releasing more Java code.
> **Note:** This feature is currently only available when using Java transforms from a Python pipeline.
-To be eligible for direct usage, the API of the Java transform has to follow the following pattern.
-* Requirement 1: The Java transform can be constructed using an available public constructor or a public static method (a constructor method) in the same Java class.
-* Requirement 2: The Java transform can be configured using one or more builder methods. Each builder method should be public and should return an instance of the Java transform.
+To be eligible for direct usage, the API of the Java transform has to meet the following requirements:
+1. The Java transform can be constructed using an available public constructor or a public static method (a constructor method) in the same Java class.
+2. The Java transform can be configured using one or more builder methods. Each builder method should be public and should return an instance of the Java transform.
-See below for the structure of an example Java class that can be directly used from the Python API.
+Here's an example Java class that can be directly used from the Python API.
-{{< highlight >}}
+```java
public class JavaDataGenerator extends PTransform<PBegin, PCollection<String>> {
. . .
- // Following method satisfies the Requirement 1.
- // Note that you may also use a class constructor instead of a static method.
+ // The following method satisfies requirement 1.
+ // Note that you could use a class constructor instead of a static method.
public static JavaDataGenerator create(Integer size) {
return new JavaDataGenerator(size);
}
@@ -6601,31 +6602,31 @@
. . .
}
- // Following method conforms to the Requirement 2
+ // The following method conforms to requirement 2.
public JavaDataGenerator withJavaDataGeneratorConfig(JavaDataGeneratorConfig dataConfig) {
return new JavaDataGenerator(this.size, javaDataGeneratorConfig);
}
. . .
}
-{{< /highlight >}}
+```
-To use a Java class that conforms to the above requirement from a Python SDK pipeline you may do the following.
+For a complete example, see [JavaDataGenerator](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaDataGenerator.java).
-* Step 1: create an allowlist file in the _yaml_ format that describes the Java transform classes and methods that will be directly accessed from Python.
-* Step 2: start an Expansion Service with the `javaClassLookupAllowlistFile` option passing path to the allowlist defined in Step 1 as the value.
-* Step 3: Use the Python [JavaExternalTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py) API to directly
- access Java transforms defined in the allowlist from the Python side.
+To use a Java class that conforms to the above requirements from a Python SDK pipeline, follow these steps:
-Starting with Beam 2.35.0, Step 1 and 2 may be skipped as described in corresponding sections below.
+1. Create a _yaml_ allowlist that describes the Java transform classes and methods that will be directly accessed from Python.
+2. Start an expansion service, using the `javaClassLookupAllowlistFile` option to pass the path to the allowlist.
+3. Use the Python [JavaExternalTransform](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/external.py) API to directly access Java transforms defined in the allowlist from the Python side.
-##### Step 1
+Starting with Beam 2.36.0, steps 1 and 2 can be skipped, as described in the corresponding sections below.
-To use this Java transform from Python, you may define an allowlist file in the _yaml_ format. This allowlist lists the class names,
+**Step 1**
+
+To use an eligible Java transform from Python, define a _yaml_ allowlist. This allowlist lists the class names,
constructor methods, and builder methods that are directly available to be used from the Python side.
-Starting with Beam 2.35.0, you have the option to specify `*` to the `javaClassLookupAllowlistFile` option instead of defining an actual allowlist which
-denotes that all supported transforms in the classpath of the expansion service may be accessed through the API.
+Starting with Beam 2.35.0, you have the option to pass `*` to the `javaClassLookupAllowlistFile` option instead of defining an actual allowlist. The `*` specifies that all supported transforms in the classpath of the expansion service can be accessed through the API. We encourage using an actual allowlist for production, because allowing clients to access arbitrary Java classes can pose a security risk.
{{< highlight >}}
version: v1
@@ -6637,74 +6638,74 @@
- withJavaDataGeneratorConfig
{{< /highlight >}}
-##### Step 2
+**Step 2**
-The allowlist is provided as an argument when starting up the Java expansion service. For example, the expansion service can be started
-as a local Java process using the following command.
+Provide the allowlist as an argument when starting up the Java expansion service. For example, you can start the expansion service
+as a local Java process using the following command:
{{< highlight >}}
java -jar <jar file> <port> --javaClassLookupAllowlistFile=<path to the allowlist file>
{{< /highlight >}}
-Starting with Beam 2.35.0, Beam ``JavaExternalTransform` API will automatically startup an expansion service with a given set of `jar` file dependencies
-if an expansion service address was not provided.
+Starting with Beam 2.36.0, the `JavaExternalTransform` API will automatically start up an expansion service with a given `jar` file dependency if an expansion service address was not provided.
-##### Step 3
+**Step 3**
-You can directly use the Java class from your Python pipeline using a stub transform created using the `JavaExternalTransform` API. This API allows you to construct the transform
-using the Java class name and allows you to invoke builder methods to configure the class.
+You can use the Java class directly from your Python pipeline using a stub transform created from the `JavaExternalTransform` API. This API allows you to construct the transform using the Java class name and allows you to invoke builder methods to configure the class.
-Constructor and method parameter types are mapped between Python and Java using a Beam Schema. The Schema is auto-generated using the object types
-provided on the Python side. If the Java class constructor method or builder method accepts any complex object types, make sure that the Beam Schema
+Constructor and method parameter types are mapped between Python and Java using a Beam schema. The schema is auto-generated using the object types
+provided on the Python side. If the Java class constructor method or builder method accepts any complex object types, make sure that the Beam schema
for these objects is registered and available for the Java expansion service. If a schema has not been registered, the Java expansion service will
-try to register a schema using [JavaFieldSchema](https://beam.apache.org/documentation/programming-guide/#creating-schemas). In Python arbitrary objects
-can be represented using `NamedTuple`s which will be represented as Beam Rows in the Schema. See below for a Python stub transform that represents the above
-mentioned Java transform.
+try to register a schema using [JavaFieldSchema](https://beam.apache.org/documentation/programming-guide/#creating-schemas). In Python, arbitrary objects
+can be represented using `NamedTuple`s, which will be represented as Beam rows in the schema. Here is a Python stub transform that represents the above
+mentioned Java transform:
-{{< highlight >}}
+```py
JavaDataGeneratorConfig = typing.NamedTuple(
'JavaDataGeneratorConfig', [('prefix', str), ('length', int), ('suffix', str)])
data_config = JavaDataGeneratorConfig(prefix='start', length=20, suffix='end')
java_transform = JavaExternalTransform(
'my.beam.transforms.JavaDataGenerator', expansion_service='localhost:<port>').create(numpy.int32(100)).withJavaDataGeneratorConfig(data_config)
-{{< /highlight >}}
+```
-This transform can be used in a Python pipeline along with other Python transforms.
+You can use this transform in a Python pipeline along with other Python transforms. For a complete example, see [javadatagenerator.py](https://github.com/apache/beam/blob/master/examples/multi-language/python/javadatagenerator.py).
-##### 13.1.1.2 Full API for Making Existing Java Transforms Available to Other SDKs
+##### 13.1.1.2 Using the API to make existing Java transforms available to other SDKs
-To make your Apache Beam Java SDK transform portable across SDK languages, you must implement two interfaces: [ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java) and [ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java). The `ExternalTransformBuilder` interface constructs the cross-language transform using configuration values passed in from the pipeline and the `ExternalTransformRegistrar` interface registers the cross-language transform for use with the expansion service.
+To make your Beam Java SDK transform portable across SDK languages, you must implement two interfaces: [ExternalTransformBuilder](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ExternalTransformBuilder.java) and [ExternalTransformRegistrar](https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/expansion/ExternalTransformRegistrar.java). The `ExternalTransformBuilder` interface constructs the cross-language transform using configuration values passed in from the pipeline, and the `ExternalTransformRegistrar` interface registers the cross-language transform for use with the expansion service.
**Implementing the interfaces**
-1. Define a Builder class for your transform that implements the `ExternalTransformBuilder` interface and overrides the `buildExternal` method that will be used to build your transform object. Initial configuration values for your transform should be defined in the `buildExternal` method. In most cases, it is convenient to make the Java transform builder class implement `ExternalTransformBuilder`.
+1. Define a Builder class for your transform that implements the `ExternalTransformBuilder` interface and overrides the `buildExternal` method that will be used to build your transform object. Initial configuration values for your transform should be defined in the `buildExternal` method. In most cases, it's convenient to make the Java transform builder class implement `ExternalTransformBuilder`.
> **Note:** `ExternalTransformBuilder` requires you to define a configuration object (a simple POJO) to capture a set of parameters sent by external SDKs to initiate the Java transform. Usually these parameters directly map to constructor parameters of the Java transform.
- {{< highlight >}}
-@AutoValue.Builder
-abstract static class Builder<K, V>
- implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
- abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
+ ```java
+ @AutoValue.Builder
+ abstract static class Builder<K, V>
+ implements ExternalTransformBuilder<External.Configuration, PBegin, PCollection<KV<K, V>>> {
+ abstract Builder<K, V> setConsumerConfig(Map<String, Object> config);
- abstract Builder<K, V> setTopics(List<String> topics);
+ abstract Builder<K, V> setTopics(List<String> topics);
- /** Remaining property declarations omitted for clarity. */
+ /** Remaining property declarations omitted for clarity. */
- abstract Read<K, V> build();
+ abstract Read<K, V> build();
- @Override
- public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
- External.Configuration config) {
- setTopics(ImmutableList.copyOf(config.topics));
+ @Override
+ public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
+ External.Configuration config) {
+ setTopics(ImmutableList.copyOf(config.topics));
- /** Remaining property defaults omitted for clarity. */
- }
-}
- {{< /highlight >}}
+ /** Remaining property defaults omitted for clarity. */
+ }
+ }
+ ```
- Note that `buildExternal` method may choose to perform additional operations before setting properties received from external SDKs in the transform. For example, `buildExternal` method may validates properties available in the configuration object before setting them in the transform.
+ For complete examples, see [JavaCountBuilder](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaCountBuilder.java) and [JavaPrefixBuilder](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaPrefixBuilder.java).
+
+ Note that the `buildExternal` method can perform additional operations before setting properties received from external SDKs in the transform. For example, `buildExternal` can validate properties available in the configuration object before setting them in the transform.
2. Register the transform as an external cross-language transform by defining a class that implements `ExternalTransformRegistrar`. You must annotate your class with the `AutoService` annotation to ensure that your transform is registered and instantiated properly by the expansion service.
3. In your registrar class, define a Uniform Resource Name (URN) for your transform. The URN must be a unique string that identifies your transform with the expansion service.
@@ -6712,43 +6713,45 @@
The following example from the KafkaIO transform shows how to implement steps two through four:
- {{< highlight >}}
-@AutoService(ExternalTransformRegistrar.class)
-public static class External implements ExternalTransformRegistrar {
+ ```java
+ @AutoService(ExternalTransformRegistrar.class)
+ public static class External implements ExternalTransformRegistrar {
- public static final String URN = "beam:external:java:kafka:read:v1";
+ public static final String URN = "beam:external:java:kafka:read:v1";
- @Override
- public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
- return ImmutableMap.of(
- URN,
- (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
- (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
- }
+ @Override
+ public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
+ return ImmutableMap.of(
+ URN,
+ (Class<? extends ExternalTransformBuilder<?, ?, ?>>)
+ (Class<?>) AutoValue_KafkaIO_Read.Builder.class);
+ }
- /** Parameters class to expose the Read transform to an external SDK. */
- public static class Configuration {
- private Map<String, String> consumerConfig;
- private List<String> topics;
+ /** Parameters class to expose the Read transform to an external SDK. */
+ public static class Configuration {
+ private Map<String, String> consumerConfig;
+ private List<String> topics;
- public void setConsumerConfig(Map<String, String> consumerConfig) {
- this.consumerConfig = consumerConfig;
+ public void setConsumerConfig(Map<String, String> consumerConfig) {
+ this.consumerConfig = consumerConfig;
+ }
+
+ public void setTopics(List<String> topics) {
+ this.topics = topics;
+ }
+
+ /** Remaining properties omitted for clarity. */
+ }
}
+ ```
- public void setTopics(List<String> topics) {
- this.topics = topics;
- }
-
- /** Remaining properties omitted for clarity. */
- }
-}
- {{< /highlight >}}
+ For additional examples, see [JavaCountRegistrar](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaCountRegistrar.java) and [JavaPrefixRegistrar](https://github.com/apache/beam/blob/master/examples/multi-language/src/main/java/org/apache/beam/examples/multilanguage/JavaPrefixRegistrar.java).
After you have implemented the `ExternalTransformBuilder` and `ExternalTransformRegistrar` interfaces, your transform can be registered and created successfully by the default Java expansion service.
**Starting the expansion service**
-An expansion service can be used with multiple transforms in the same pipeline. Java has a default expansion service included and available in the Apache Beam Java SDK for you to use with your Java transforms. You can write your own expansion service, but that is generally not needed, so it is not covered in this section.
+You can use an expansion service with multiple transforms in the same pipeline. The Beam Java SDK provides a default expansion service for Java transforms. You can also write your own expansion service, but that's generally not needed, so it's not covered in this section.
Perform the following to start up a Java expansion service directly:
@@ -6761,7 +6764,7 @@
The expansion service is now ready to serve transforms on the specified port.
-When creating SDK-specific wrappers for your transform, SDKs may provide utilities that are readily available for easily starting up an expansion service. For example, the Python SDK provides the utilities [`JavaJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService) and [`BeamJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.BeamJarExpansionService) for starting up a Java expansion service using a JAR file.
+When creating SDK-specific wrappers for your transform, you may be able to use SDK-provided utilities to start up an expansion service. For example, the Python SDK provides the utilities [`JavaJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService) and [`BeamJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.BeamJarExpansionService) for starting up a Java expansion service using a JAR file.
**Including dependencies**
@@ -6769,36 +6772,36 @@
**Writing SDK-specific wrappers**
-Your cross-language Java transform can be called through the lower-level [`ExternalTransform`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform) class in a multi-language pipeline (as described in the next section); however, if possible, you should create a SDK-specific wrapper written in the programming language of the pipeline (such as Python) to access the transform instead. This higher-level abstraction will make it easier for pipeline authors to use your transform.
+Your cross-language Java transform can be called through the lower-level [`ExternalTransform`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform) class in a multi-language pipeline (as described in the next section); however, if possible, you should write an SDK-specific wrapper in the language of the pipeline (such as Python) to access the transform instead. This higher-level abstraction will make it easier for pipeline authors to use your transform.
To create an SDK wrapper for use in a Python pipeline, do the following:
1. Create a Python module for your cross-language transform(s).
-2. In the module, build the payload that should be used to initiate the cross-language transform expansion request using one of the available [`PayloadBuilder`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.PayloadBuilder) classes.
+2. In the module, use one of the [`PayloadBuilder`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.PayloadBuilder) classes to build the payload for the initial cross-language transform expansion request.
The parameter names and types of the payload should map to parameter names and types of the configuration POJO provided to the Java `ExternalTransformBuilder`. Parameter types are mapped across SDKs using a [Beam schema](https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/schema.proto). Parameter names are mapped by simply converting Python underscore-separated variable names to camel-case (Java standard).
- In the following example, [kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py) uses `NamedTupleBasedPayloadBuilder` to build the payload. The parameters map to the Java [KafkaIO.External.Configuration](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java) config object defined previously in the **Implementing the interfaces** section.
+ In the following example, [kafka.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py) uses `NamedTupleBasedPayloadBuilder` to build the payload. The parameters map to the Java [KafkaIO.External.Configuration](https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java) config object defined in the previous section.
- {{< highlight >}}
-class ReadFromKafkaSchema(typing.NamedTuple):
- consumer_config: typing.Mapping[str, str]
- topics: typing.List[str]
- # Other properties omitted for clarity.
+ ```py
+ class ReadFromKafkaSchema(typing.NamedTuple):
+ consumer_config: typing.Mapping[str, str]
+ topics: typing.List[str]
+ # Other properties omitted for clarity.
-payload = NamedTupleBasedPayloadBuilder(ReadFromKafkaSchema(...))
- {{< /highlight >}}
+ payload = NamedTupleBasedPayloadBuilder(ReadFromKafkaSchema(...))
+ ```
-3. Start an expansion service unless one is specified by the pipeline creator. The Apache Beam Python SDK provides utilities [`JavaJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService) and [`BeamJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.BeamJarExpansionService) for easily starting up an expansion service using a JAR file.. [`JavaJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService) can be used to startup an expansion service using path (a local path or a URL) to a given JAR file. [`BeamJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.BeamJarExpansionService) can be used for easily starting an expansion service based on a JAR released with Beam.
+3. Start an expansion service, unless one is specified by the pipeline creator. The Beam Python SDK provides the utilities [`JavaJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService) and [`BeamJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.BeamJarExpansionService) for starting up an expansion service using a JAR file. [`JavaJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.JavaJarExpansionService) can be used to start up an expansion service using the path (a local path or a URL) to a given JAR file. [`BeamJarExpansionService`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.BeamJarExpansionService) can be used to start an expansion service from a JAR released with Beam.
- For transforms released with Beam do the following:
+ For transforms released with Beam, do the following:
- 1. Add a Gradle target to Beam that can be used to build a shaded expansion service JAR for the target Java transform. This target should produce a Beam JAR that contains all dependencies needed for expanding the Java transform and the JAR should be released with Beam. Note that you might be able to use one of the existing Gradle target that offer an aggregated version of an expansion service jar (for example, for all GCP IO).
+ 1. Add a Gradle target to Beam that can be used to build a shaded expansion service JAR for the target Java transform. This target should produce a Beam JAR that contains all dependencies needed for expanding the Java transform, and the JAR should be released with Beam. You might be able to use an existing Gradle target that offers an aggregated version of an expansion service JAR (for example, for all GCP IO).
2. In your Python module, instantiate `BeamJarExpansionService` with the Gradle target.
- {{< highlight >}}
- expansion_service = BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
- {{< /highlight >}}
+ ```py
+ expansion_service = BeamJarExpansionService('sdks:java:io:expansion-service:shadowJar')
+ ```
4. Add a Python wrapper transform class that extends [`ExternalTransform`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform). Pass the payload and expansion service defined above as parameters to the constructor of the [`ExternalTransform`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform) parent class.
#### 13.1.2. Creating cross-language Python transforms
@@ -6809,41 +6812,41 @@
1. Define a Uniform Resource Name (URN) for your transform. The URN must be a unique string that identifies your transform with the expansion service.
- {{< highlight >}}
-TEST_COMPK_URN = "beam:transforms:xlang:test:compk"
- {{< /highlight >}}
+ ```py
+ TEST_COMPK_URN = "beam:transforms:xlang:test:compk"
+ ```
2. For an existing Python transform, create a new class to register the URN with the Python expansion service.
- {{< highlight >}}
-@ptransform.PTransform.register_urn(TEST_COMPK_URN, None)
-class CombinePerKeyTransform(ptransform.PTransform):
- {{< /highlight >}}
+ ```py
+ @ptransform.PTransform.register_urn(TEST_COMPK_URN, None)
+ class CombinePerKeyTransform(ptransform.PTransform):
+ ```
3. From within the class, define an expand method that takes an input PCollection, runs the Python transform, and then returns the output PCollection.
- {{< highlight >}}
-def expand(self, pcoll):
- return pcoll \
- | beam.CombinePerKey(sum).with_output_types(
- typing.Tuple[unicode, int])
- {{< /highlight >}}
+ ```py
+ def expand(self, pcoll):
+ return pcoll \
+ | beam.CombinePerKey(sum).with_output_types(
+ typing.Tuple[unicode, int])
+ ```
4. As with other Python transforms, define a `to_runner_api_parameter` method that returns the URN.
- {{< highlight >}}
-def to_runner_api_parameter(self, unused_context):
- return TEST_COMPK_URN, None
- {{< /highlight >}}
+ ```py
+ def to_runner_api_parameter(self, unused_context):
+ return TEST_COMPK_URN, None
+ ```
5. Define a static `from_runner_api_parameter` method that returns an instantiation of the cross-language Python transform.
- {{< highlight >}}
-@staticmethod
-def from_runner_api_parameter(
- unused_ptransform, unused_parameter, unused_context):
- return CombinePerKeyTransform()
- {{< /highlight >}}
+ ```py
+ @staticmethod
+ def from_runner_api_parameter(
+ unused_ptransform, unused_parameter, unused_context):
+ return CombinePerKeyTransform()
+ ```
**Starting the expansion service**
-An expansion service can be used with multiple transforms in the same pipeline. Python has a default expansion service included and available in the Apache Beam Python SDK for you to use with your Python transforms. You are free to write your own expansion service, but that is generally not needed, so it is not covered in this section.
+An expansion service can be used with multiple transforms in the same pipeline. The Beam Python SDK provides a default expansion service for you to use with your Python transforms. You are free to write your own expansion service, but that is generally not needed, so it is not covered in this section.
Perform the following steps to start up the default Python expansion service directly:
@@ -6860,30 +6863,30 @@
$ python -m apache_beam.runners.portability.expansion_service_test -p $PORT_FOR_EXPANSION_SERVICE
{{< /highlight >}}
-4. This expansion service is not ready to serve up transforms on the address `localhost:$PORT_FOR_EXPANSION_SERVICE`.
+4. This expansion service is now ready to serve up transforms on the address `localhost:$PORT_FOR_EXPANSION_SERVICE`.
**Including dependencies**
-Currently Python external transforms are limited to dependencies available in core Beam SDK Harness.
+Currently Python external transforms are limited to dependencies available in the core Beam SDK harness.
#### 13.1.3. Creating cross-language Go transforms
Go currently does not support creating cross-language transforms, only using cross-language
transforms from other languages; see more at [BEAM-9923](https://issues.apache.org/jira/browse/BEAM-9923).
-#### 13.1.4. Selecting a URN for Cross-language Transforms
+#### 13.1.4. Defining a URN
Developing a cross-language transform involves defining a URN for registering the transform with an expansion service. In this section
we provide a convention for defining such URNs. Following this convention is optional but it will ensure that your transform
will not run into conflicts when registering in an expansion service along with transforms developed by other developers.
-##### Schema
+##### 13.1.4.1. Schema
A URN should consist of the following components:
-* ns-id: A namespace identifier. Default recommendation is `beam:transform`.
-* org-identifier: Identifies the organization where the transform was defined. Transforms defined in Apache Beam use `org.apache.beam` for this.
-* functionality-identifier - Identifies the functionality of the cross-language transform.
-* version - a version number for the transform
+* **ns-id**: A namespace identifier. Default recommendation is `beam:transform`.
+* **org-identifier**: Identifies the organization where the transform was defined. Transforms defined in Apache Beam use `org.apache.beam` for this.
+* **functionality-identifier**: Identifies the functionality of the cross-language transform.
+* **version**: a version number for the transform.
We provide the schema from the URN convention in [augmented Backus–Naur](https://en.wikipedia.org/wiki/Augmented_Backus%E2%80%93Naur_form) form.
Keywords in upper case are from the [URN spec](https://datatracker.ietf.org/doc/html/rfc8141).
@@ -6897,7 +6900,7 @@
version = “v” 1*(DIGIT / “.”) ; For example, ‘v1.2’
{{< /highlight >}}
-##### Examples
+##### 13.1.4.2. Examples
Below we’ve given some example transform classes and corresponding URNs to be used.
@@ -6918,7 +6921,7 @@
**Using the External class**
-1. Make sure you have any runtime environment dependencies (like JRE) installed on your local machine (either directly on the local machine or available through a container). See the expansion service section for more details.
+1. Make sure you have any runtime environment dependencies (like the JRE) installed on your local machine (either directly on the local machine or available through a container). See the expansion service section for more details.
> **Note:** When including Python transforms from within a Java pipeline, all Python dependencies have to be included in the SDK harness container.
2. Start up the expansion service for the SDK that is in the language of the transform you're trying to consume, if not available.
@@ -6929,13 +6932,13 @@
#### 13.2.2. Using cross-language transforms in a Python pipeline
-If a Python-specific wrapper for a cross-language transform is available, use that; otherwise, you have to use the lower-level [`ExternalTransform`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform) class to access the transform.
+If a Python-specific wrapper for a cross-language transform is available, use that. Otherwise, you have to use the lower-level [`ExternalTransform`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.ExternalTransform) class to access the transform.
**Using an SDK wrapper**
-To use a cross-language transform through an SDK wrapper, import the module for the SDK wrapper and call it from your pipeline as shown in the example:
+To use a cross-language transform through an SDK wrapper, import the module for the SDK wrapper and call it from your pipeline, as shown in the example:
- {{< highlight >}}
+```py
from apache_beam.io.kafka import ReadFromKafka
kafka_records = (
@@ -6948,35 +6951,37 @@
topics=[self.topic],
max_num_records=max_num_records,
expansion_service=<Address of expansion service>))
- {{< /highlight >}}
+```
**Using the ExternalTransform class**
When an SDK-specific wrapper isn't available, you will have to access the cross-language transform through the `ExternalTransform` class.
-1. Make sure you have any runtime environment dependencies (like JRE) installed on your local machine. See the expansion service section for more details.
+1. Make sure you have any runtime environment dependencies (like the JRE) installed on your local machine. See the expansion service section for more details.
2. Start up the expansion service for the SDK that is in the language of the transform you're trying to consume, if not available.
Make sure the transform you're trying to use is available and can be used by the expansion service. For Java, make sure the builder and registrar for the transform are available in the classpath of the expansion service.
-3. Include `ExternalTransform` when instantiating your pipeline. Reference the URN, Payload, and expansion service. You can use one of the available [`PayloadBuilder`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.PayloadBuilder) classes to build the payload for `ExternalTransform`.
+3. Include `ExternalTransform` when instantiating your pipeline. Reference the URN, payload, and expansion service. You can use one of the available [`PayloadBuilder`](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.external.html#apache_beam.transforms.external.PayloadBuilder) classes to build the payload for `ExternalTransform`.
- {{< highlight >}}
-with pipeline as p:
- res = (
- p
- | beam.Create(['a', 'b']).with_output_types(unicode)
- | beam.ExternalTransform(
- TEST_PREFIX_URN,
- ImplicitSchemaPayloadBuilder({'data': u'0'}),
- <Address of expansion service>))
- assert_that(res, equal_to(['0a', '0b']))
- {{< /highlight >}}
+ ```py
+ with pipeline as p:
+ res = (
+ p
+ | beam.Create(['a', 'b']).with_output_types(unicode)
+ | beam.ExternalTransform(
+ TEST_PREFIX_URN,
+ ImplicitSchemaPayloadBuilder({'data': u'0'}),
+ <Address of expansion service>))
+ assert_that(res, equal_to(['0a', '0b']))
+ ```
-4. After the job has been submitted to the Beam runner, shutdown the expansion service by terminating the expansion service process.
+ For additional examples, see [addprefix.py](https://github.com/apache/beam/blob/master/examples/multi-language/python/addprefix.py) and [javacount.py](https://github.com/apache/beam/blob/master/examples/multi-language/python/javacount.py).
+
+4. After the job has been submitted to the Beam runner, shut down the expansion service by terminating the expansion service process.
#### 13.2.3. Using cross-language transforms in a Go pipeline
-If a Go-specific wrapper for a cross-language is available, use that; otherwise, you have to use the
+If a Go-specific wrapper for a cross-language is available, use that. Otherwise, you have to use the
lower-level [CrossLanguage](https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#CrossLanguage)
function to access the transform.
@@ -6992,7 +6997,7 @@
To use a cross-language transform through an SDK wrapper, import the package for the SDK wrapper
and call it from your pipeline as shown in the example:
-{{< highlight >}}
+```go
import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/kafkaio"
)
@@ -7005,7 +7010,7 @@
[]string{topicName},
kafkaio.MaxNumRecords(numRecords),
kafkaio.ConsumerConfigs(map[string]string{"auto.offset.reset": "earliest"}))
-{{< /highlight >}}
+```
**Using the CrossLanguage function**
@@ -7014,7 +7019,7 @@
1. Make sure you have the appropriate expansion service running. See the expansion service section for details.
2. Make sure the transform you're trying to use is available and can be used by the expansion service.
Refer to [Creating cross-language transforms](#create-x-lang-transforms) for details.
-3. Use the `beam.CrossLanguage` function in your pipeline as appropriate. Reference the URN, Payload,
+3. Use the `beam.CrossLanguage` function in your pipeline as appropriate. Reference the URN, payload,
expansion service address, and define inputs and outputs. You can use the
[beam.CrossLanguagePayload](https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#CrossLanguagePayload)
function as a helper for encoding a payload. You can use the
@@ -7022,22 +7027,22 @@
[beam.UnnamedOutput](https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam#UnnamedOutput)
functions as shortcuts for single, unnamed inputs/outputs or define a map for named ones.
- {{< highlight >}}
-type prefixPayload struct {
- Data string `beam:"data"`
-}
-urn := "beam:transforms:xlang:test:prefix"
-payload := beam.CrossLanguagePayload(prefixPayload{Data: prefix})
-expansionAddr := "localhost:8097"
-outT := beam.UnnamedOutput(typex.New(reflectx.String))
-res := beam.CrossLanguage(s, urn, payload, expansionAddr, beam.UnnamedInput(inputPCol), outT)
- {{< /highlight >}}
+ ```go
+ type prefixPayload struct {
+ Data string `beam:"data"`
+ }
+ urn := "beam:transforms:xlang:test:prefix"
+ payload := beam.CrossLanguagePayload(prefixPayload{Data: prefix})
+ expansionAddr := "localhost:8097"
+ outT := beam.UnnamedOutput(typex.New(reflectx.String))
+ res := beam.CrossLanguage(s, urn, payload, expansionAddr, beam.UnnamedInput(inputPCol), outT)
+ ```
4. After the job has been submitted to the Beam runner, shutdown the expansion service by
terminating the expansion service process.
### 13.3. Runner Support {#x-lang-transform-runner-support}
-Currently, portable runners such as Flink, Spark, and the Direct runner can be used with multi-language pipelines.
+Currently, portable runners such as Flink, Spark, and the direct runner can be used with multi-language pipelines.
-Google Cloud Dataflow supports multi-language pipelines through the Dataflow Runner v2 backend architecture.
+Dataflow supports multi-language pipelines through the Dataflow Runner v2 backend architecture.