To complete this tutorial, you must have the following software installed:
NOTE: If you are new to Beam YAML, kindly follow this guide to learn executing YAML pipelines. To learn more about transform providers, visit YAML providers.
The purpose of this tutorial is to introduce the fundamental concepts of the Cross-Language framework that is leveraged by Beam YAML to allow a user to specify Beam Java transforms through the use of transform providers such that the transforms can be easily defined in a Beam YAML pipeline.
As we walk through these concepts, we will be constructing a Transform called ToUpperCase
that will take in a single parameter field
, which represents a field in the collection of elements, and modify that field by converting the string to uppercase.
There are four main steps to follow:
The project structure for this tutorial is as follows:
MyExternalTransforms/ ├── pom.xml └── src/ └── main/ └── java/ └── org/ └── example/ ├── ToUpperCaseTransformProvider.java └── SkeletonSchemaProvider.java
Here is a brief description of each file:
SchemaTransform
Identity function.SchemaTransform
to be used in the Beam YAML pipeline. This project structure assumes that the java module is org.example
, but any module path can be used so long as the project structure matches.A pom.xml
file, which stands for Project Object Model, is an essential file used in Maven projects. It‘s an XML file that contains all the critical information about a project, including its configuration details for Maven to build it successfully. This file specifies things like the project’s name, version, dependencies on other libraries, and how the project should be packaged (e.g., JAR file).
Since this tutorial won’t cover all the details about Maven and pom.xml
, here's a link to the official documentation for more details: https://maven.apache.org/pom.html
A minimal pom.xml
file can be found in the project repo here.
Writing a transform that is compatible with the Beam YAML framework requires leveraging Beam’s cross-language framework, and more specifically, the SchemaTransformProvider
interface (and even more specifically, the TypedSchemaTransformProvider
interface).
This framework relies on creating a PTransform
that operates solely on Beam Row
’s - a schema-aware data type built into Beam that is capable of being translated across SDK’s. Leveraging the SchemaTransformProvider
interface removes the need to write a lot of the boilerplate code required to translate data across the SDK’s, allowing us to focus on the transform functionality itself.
See SkeletonSchemaProvider.java
for the full code.
This is the bare minimum code (excluding import and package) to create a SchemaTransformProvider
that can be used by the cross-language framework, allowing the SchemaTransform
defined within it to be used in any Beam YAML pipeline. In this case, the transform acts as an Identity function, outputting the input collection of elements without alteration.
Let’s start by breaking down the top-level methods required by the SchemaTransformProvider
interface.
configurationClass()
See SkeletonSchemaProvider.java#L27-L30
.
The configurationClass()
method tells the cross-language framework which Java class defines the input parameters for the Transform. The Configuration
class (defined in the skeleton code) will be discussed in more detail later.
identifier()
See SkeletonSchemaProvider.java#L32-L35
.
The identifier()
method defines a unique identifier (URN) for the Transform. Ensure this name doesn't collide with other Transform URNs, including built-in Beam transforms and any others defined in custom catalogs.
inputCollectionNames()
See SkeletonSchemaProvider.java#L42-L45
.
The inputCollectionNames()
method returns a list of expected input names for the tagged input collections. In Beam YAML, the primary input collection is typically tagged "input"
. While different names can be used here (as it‘s not a strict contract between SDKs), Beam YAML sends the collection tagged "input"
. It’s best practice to return INPUT_TAG
(defined in the example code).
outputCollectionNames()
See SkeletonSchemaProvider.java#L47-L50
.
The outputCollectionNames()
method returns a list of output names for the tagged output collections. Similar to inputCollectionNames()
, the primary output is usually tagged "output"
. If error handling is configured in Beam YAML, there might also be an error output collection tagged according to the error_handling
configuration. It's best practice to return OUTPUT_TAG
and ERROR_TAG
(defined in the example code).
from()
See SkeletonSchemaProvider.java#L52-L55
.
The from()
method returns the SchemaTransform
instance itself. This transform is a PTransform
that performs the actual operation on the incoming collection(s). As a PTransform
, it requires an expand()
method, which defines the transform's logic, often including a DoFn
.
description()
See SkeletonSchemaProvider.java#L37-L40
.
The optional description()
method provides a human-readable description of the transform. While largely unused by the Beam YAML framework itself, it's valuable for documentation generation, especially when used with the generate_yaml_docs.py
script (e.g., for the Beam YAML transform glossary).
See ToUpperCaseTransformProvider.java#L72-L100
.
The Configuration
class defines the parameters for the transform.
AutoValue
class annotated with @AutoValueSchema
to automatically generate the Beam schema.getField()
for a field
parameter).Builder
subclass annotated with @AutoValue.Builder
is needed for instantiation, with setter methods corresponding to the getters.@Nullable
. Required parameters omit this annotation.@SchemaFieldDescription
annotation can optionally provide descriptions for parameters, useful for documentation generation (as mentioned for description()
).To support Beam YAML's built-in error handling framework, the Configuration
class must include a parameter of type ErrorHandling
. This allows the transform to receive error handling configuration (like the output tag for errors) from the YAML pipeline and conditionally catch exceptions or route specific elements to the error output.
An optional validate()
method can be added to the Configuration
class for input parameter validation. This is useful for:
See ToUpperCaseTransformProvider.java#L102-L179
.
This class implements the SchemaTransform
interface and contains the core logic within its expand()
method.
expand()
Method: (ToUpperCaseTransformProvider.java#L141-L177
)
PCollectionRowTuple
, which is a map of tags to PCollection<Row>
. Extract the main input PCollection
using the appropriate tag (usually "input"
).PCollection
. This is needed to define the output schemas.ErrorHandling.errorSchema()
, which wraps the original schema with error-specific fields.ErrorHandling
object (from the configuration) to determine if error handling is enabled and what the output tag for errors should be.ParDo
transform with a custom DoFn
. This DoFn
performs the actual toUpperCase
operation.TupleTag
s to tag successful output and error output separately.IllegalArgumentException
if the specified field
doesn't exist) and output an error record using the error TupleTag
. Otherwise, let exceptions propagate.PCollection
s using .setRowSchema()
. This is crucial for cross-language compatibility.PCollectionRowTuple
containing the tagged output PCollection
s."output"
.ErrorHandling
configuration (e.g., "errors"
).createDoFn()
Method: (ToUpperCaseTransformProvider.java#L116-L139
)
This helper method constructs the DoFn
. Key aspects:
field
name) and the TupleTag
s for output.@ProcessElement
method contains the logic:Row
.field
.Row.Builder
to create a new output Row
with the modified field.Row
using the main output TupleTag
.Row
(using ErrorHandling.errorRecord()
) and output it using the error TupleTag
.At this point, you should have the necessary Java code (ToUpperCaseTransformProvider.java
and potentially SkeletonSchemaProvider.java
if you started from there). Now, build the JAR file that will contain your transform and be provided to the Beam YAML pipeline.
From the root directory of your Maven project, run:
mvn package
This command compiles your code and packages it into a JAR file located in the target/
directory. By default (based on the starter pom.xml
), the JAR will be named something like xlang-transforms-bundled-1.0-SNAPSHOT.jar
. This “bundled” or “shaded” JAR includes your transform code, its dependencies, and the necessary components for the Beam expansion service.
Note: The final JAR name is configurable in your pom.xml
within the maven-shade-plugin
configuration using the <finalName>
tag (see pom.xml#L85-L87
).
Now that you have a JAR file containing your transform catalog, you can use it in a Beam YAML pipeline via the providers
section. Providers tell Beam YAML where to find external transforms.
We will use the javaJar
provider type since our transform is in a Java JAR.
providers: - type: javaJar config: # Path to your built JAR file jar: "target/xlang-transforms-bundled-1.0-SNAPSHOT.jar" transforms: # Mapping: YAML transform name -> Java transform URN (from identifier()) ToUpperCase: "some:urn:to_upper_case:v1" Identity: "some:urn:transform_name:v1" # Assuming SkeletonSchemaProvider is also included
jar
: Specifies the path to the JAR file containing the transform(s). Adjust the path if necessary (e.g., if running from a different directory or if the JAR is in a central location).transforms
: Maps the desired name for the transform in your YAML file (e.g., ToUpperCase
) to the unique URN defined in your Java SchemaTransformProvider
's identifier()
method.Parameter Naming: By default, the javaJar
provider converts Java camelCase
parameter names (from your Configuration
class getters) to snake_case
for use in the YAML file. For example, a getField()
getter corresponds to a field
parameter in YAML, and getErrorHandling()
corresponds to error_handling
.
If you need different names in YAML, you can use the renaming
provider type instead of or in addition to javaJar
. See the standard I/O providers for examples.
Now, ToUpperCase
can be used like any other transform in your pipeline:
Full Example (pipeline.yaml
):
pipeline: type: chain # Optional: simplifies linear pipelines transforms: - type: Create config: elements: - name: "john" id: 1 - name: "jane" id: 2 - type: Identity # Using the skeleton transform input: Create - type: ToUpperCase input: Identity # Input defaults to previous transform in a chain config: field: "name" - type: LogForTesting # Built-in transform for logging input: ToUpperCase providers: - type: javaJar config: jar: "target/xlang-transforms-bundled-1.0-SNAPSHOT.jar" transforms: ToUpperCase: "some:urn:to_upper_case:v1" Identity: "some:urn:transform_name:v1"
Expected Logs:
message: "{\"name\":\"JOHN\",\"id\":1}" message: "{\"name\":\"JANE\",\"id\":2}"
Note: Beam YAML might choose the Java implementation of LogForTesting
to minimize cross-language calls, potentially making the logs verbose. Look for the specific messages shown above.
Example with Error Handling:
pipeline: transforms: - type: Create config: elements: - name: "john" id: 1 - name: "jane" # This element has no 'unknown' field id: 2 - type: ToUpperCase input: Create config: field: "unknown" # This field doesn't exist error_handling: output: errors # Send errors to 'ToUpperCase.errors' - type: LogForTesting name: LogSuccess # Give transforms unique names if type is repeated input: ToUpperCase # Default output (successful records) - type: LogForTesting name: LogErrors input: ToUpperCase.errors # Error output providers: - type: javaJar config: jar: "target/xlang-transforms-bundled-1.0-SNAPSHOT.jar" transforms: ToUpperCase: "some:urn:to_upper_case:v1" Identity: "some:urn:transform_name:v1"
Expected Logs (Error Handling Example): (The exact error message might vary slightly)
# From LogErrors message: "{\"error\":\"java.lang.IllegalArgumentException: Field not found: unknown\",\"element\":\"{\\\"name\\\":\\\"john\\\",\\\"id\\\":1}\"}" message: "{\"error\":\"java.lang.IllegalArgumentException: Field not found: unknown\",\"element\":\"{\\\"name\\\":\\\"jane\\\",\\\"id\\\":2}\"}" # LogSuccess will produce no output as all elements failed.
For a complete reference on error handling configuration, visit Beam YAML Error Handling.
If you have Apache Beam for Python installed, you can test this pipeline locally:
python -m apache_beam.yaml.main --yaml_pipeline_file=pipeline.yaml
Alternatively, if you have gcloud
configured, you can run it on Google Cloud Dataflow:
# Ensure your JAR is accessible, e.g., in a GCS bucket export BUCKET_NAME=your-gcs-bucket-name gsutil cp target/xlang-transforms-bundled-1.0-SNAPSHOT.jar gs://$BUCKET_NAME/ # Update pipeline.yaml to point to the GCS path: # providers: # - type: javaJar # config: # jar: "gs://your-gcs-bucket-name/xlang-transforms-bundled-1.0-SNAPSHOT.jar" # transforms: ... export JOB_NAME=my-yaml-job-$(date +%Y%m%d-%H%M%S) export REGION=your-gcp-region # e.g., us-central1 gcloud dataflow yaml run $JOB_NAME --yaml-pipeline-file=pipeline.yaml --region=$REGION --staging-location=gs://$BUCKET_NAME/staging
(Note: Running on Dataflow requires the JAR to be in a location accessible by the Dataflow service, like Google Cloud Storage.)