Add code snippets

Signed-off-by: Jeffrey Kinard <jeff@thekinards.com>
3 files changed
tree: 016caf95c6d7a9b5b547037de5ce7af16be5b9b2
  1. src/
  2. .gitignore
  3. LICENSE
  4. pom.xml
  5. README.md
README.md

Creating a Beam Java Transform Catalog and Using in Beam YAML

Prerequisites

To complete this tutorial, you must have the following software installed:

  • Java 11 or later
  • Apache Maven 3.6 or later

Overview

The purpose of this tutorial is to introduce the fundamental concepts of the Cross-Language framework that is leveraged by Beam YAML to allow a user to specify Beam Java transforms through the use of transform providers such that the transforms can be easily defined in a Beam YAML pipeline.

As we walk through these concepts, we will be constructing a Transform called ToUpperCase that will take in a single parameter field, which represents a field in the collection of elements, and modify that field by converting the string to uppercase.

Project Structure

The project structure for this tutorial is as follows:

MyExternalTransforms
├── pom.xml
└── src
    └── main
        └── java
            └── org
                └── example
                    ├── ToUpperCaseTransformProvider.java
                    └── SkeletonSchemaProvider.java

Here is a brief description of each file:

  • pom.xml: The Maven project configuration file.
  • SkeletonSchemaProvider.java: The Java class that contains the bare-minimum skeleton code for implementing a SchemaTransform Identity function.
  • ToUpperCaseTransformProvider.java: The Java class that contains the SchemaTransform to be used in the Beam YAML pipeline. This project structure assumes that the java module is org.example, but any module path can be used so long as the project structure matches.

Creating the pom.xml file

A pom.xml file, which stands for Project Object Model, is an essential file used in Maven projects. It‘s an XML file that contains all the critical information about a project, including its configuration details for Maven to build it successfully. This file specifies things like the project’s name, version, dependencies on other libraries, and how the project should be packaged (e.g., JAR file).

Since this tutorial won’t cover all the details about Maven and pom.xml, here's a link to the official documentation for more details: https://maven.apache.org/pom.html

Minimal pom.xml

A minimal pom.xml file can be found in the project repo here.

Writing the External Transform

Writing a transform that is compatible with the Beam YAML framework requires leveraging Beam’s cross-language framework, and more specifically, the SchemaTransformProvider interface (and even more specifically, the TypedSchemaTransformProvider interface).

This framework relies on creating a PTransform that operates solely on Beam Row’s - a schema-aware data type built into Beam that is capable of being translated across SDK’s. Leveraging the SchemaTransformProvider interface removes the need to write a lot of the boilerplate code required to translate data across the SDK’s, allowing us to focus on the transform functionality itself.

SchemaTransformProvider Skeleton Code

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/src/main/java/org/example/SkeletonSchemaProvider.java#L1-L96

This is the bare minimum code (excluding import and package) to create a SchemaTransformProvider that can be used by the cross-language framework, and therefore allowing the SchemaTransform defined within it to be defined in any Beam YAML pipeline. In this case, the transform will act as an Identity function and will output the input collection of elements with no alteration.

Let’s start by breaking down the top-level methods that are required to be compatible with the SchemaTransformProvider interface.

configurationClass()

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/src/main/java/org/example/SkeletonSchemaProvider.java#L27-L30

The configurationClass() method is responsible for telling the cross-language framework which Java class defines the input parameters to the Transform. The Configuration class we defined in the skeleton code will be revisited in depth later.

identifier()

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/src/main/java/org/example/SkeletonSchemaProvider.java#L32-L35

The identifier() method defines a unique identifier for the Transform. It is important to ensure that this name does not collide with any other Transform URN that will be given to the External Transform service, both those built-in to Beam, and any that are defined in a custom catalog such as this.

inputCollectionNames()

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/src/main/java/org/example/SkeletonSchemaProvider.java#L42-L45

The inputCollectionNames() method returns a list of expected input names for the tagged input collections. In most cases, especially in Beam YAML, there will be a collection of input elements that are tagged “input”. It is acceptable to use different names in this method as it is not actually used as a contract between SDK's, but what is important to note is that Beam YAML will be sending a collection that is tagged “input” to the transform. It is best to use this method definition with the macros defined at the top of the file.

outputCollectionNames()

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/src/main/java/org/example/SkeletonSchemaProvider.java#L47-L50

The outputCollectionNames() method returns a list of output names for the tagged output collections. Similar to inputCollectionNames(), there will be a collection of output elements that are tagged “output”, but in most cases, there will also be a subset of elements tagged with whatever was defined in the error_handling section of the transform config in Beam YAML. Since this method is also not used as a contract, it is best to use this method definition with the macros defined at the top of the file.

from()

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/src/main/java/org/example/SkeletonSchemaProvider.java#L52-L55

The from() method is the method that is responsible for returning the SchemaTransform itself. This transform is the PTransform that will actually perform the transform on the incoming collection of elements. Since it is a PTransform, it requires one method - expand() which defines the expansion of the transform and includes the DoFn.

description()

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/src/main/java/org/example/SkeletonSchemaProvider.java#L37-L40

The optional description() method is where a description of the transform can be written. This description is largely unused by the Beam YAML framework, but is useful for generating docs when used in conjunction with the generate_yaml_docs.py script. This is useful when generating docs for a transform catalog. For example, the Beam YAML transform glossary.

ToUpperCaseProvider Configuration Class

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/src/main/java/org/example/ToUpperCaseTransformProvider.java#L72-L100

The Configuration class is responsible for defining the parameters to the transform. This AutoValue class is annotated with the AutoValueSchema interface to generate the schema for the transform. This interface scrapes the inputs by using all the getter methods. In our ToUpperCase example, there is one input parameter, field, that specifies the field in the input collection to perform the operation on. So, we define a getter method, getField, that will tell the AutoValueSchema class about our input parameter.

Likewise, the Configuration class needs a Builder subclass annotated with AutoValue.Builder so that the AutoValue class can be instantiated. This builder needs a setter method for each subsequent getter method in the parent class.

Optional parameters should be annotated with the @Nullable annotation. Required parameters, therefore, should omit this annotation.

The @SchemaFieldDescription annotation can also be optionally used to define the parameter. This is also passed to the Beam YAML framework and can be used in conjunction with the generate_yaml_docs.py script to generate docs for the transform. This is useful when generating docs for a transform catalog. For example, the Beam YAML transform glossary.

Error Handling

In Beam YAML, there is a built-in error handling framework that allows a transform to consume the error output from a transform, both in the turnkey Beam YAML transforms and in compatible External Transforms. This error output could be any Exceptions that are caught and tagged, or any custom logic that would cause an element to be treated as an error.

To make a transform compatible with this framework, one must take the ErrorHandling object as an input to the transform, and therefore define it in this Configuration.

Validation

One last optional method for the Configuration class is the validate() method. This method is responsible for checking the input parameters to the transform. In our example, we assume there is a field metadata in the input collection that cannot be modified. So, we perform a check on the field parameter to verify the user is not attempting to modify this field. This method can also be useful for checking dependent inputs, (i.e. parameter A is required if parameter B is specified).

ToUpperCaseProvider SchemaTransform Class

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/src/main/java/org/example/ToUpperCaseTransformProvider.java#L102-L179

This is the class that will define the actual transform that is performed on the incoming PCollection. Let’s first take a look at the expand() function.

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/src/main/java/org/example/ToUpperCaseTransformProvider.java#L141-L177

Every incoming PCollectionRowTuple is essentially a tagged collection. As stated before, in most cases, with context to Beam YAML, there will only be one tag, “input”. To get the PCollection of Row elements, we first need to unpack these from the PCollectionRowTuple using the “input” tag.

From this collection of elements, the schema of the input collection can be obtained. This is useful for assembling our output collections, since their schema will be based on this schema. In the case of the successful records, the schema will remain unchanged (since we are modifying a single field in-place), and the error records will use a schema that essentially wraps the original schema with a couple error-specific fields as defined by errorSchema.

Whether to do error_handling is determined by the ErrorHandling class. If error handling is specified in the config of the transform in the Beam YAML pipeline, then exceptions will be caught and stored in an “error”-tagged output as opposed to thrown at Runtime.

Next, the PTransform is actually applied. The DoFn will be detailed more below, but what is important to note here is that the output is tagged using two arbitrary TupleTag objects - one for successful records, and one for error records.

After the PTransform is applied, it is important to ensure that both output collections have their schema configured so that the cross-language service can encode and decode the objects back to the Beam YAML SDK.

Finally, the resulting PCollectionRowTuple must be constructed. The successful records should be stored and tagged “output” regardless of if error_handling was specified, and if error_handling was specified, it can be appended to the same PCollectionRowTuple and tagged according to the output specified by the error_handling config in the Beam YAML pipeline.

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/src/main/java/org/example/ToUpperCaseTransformProvider.java#L116-L139

Now, taking a look at the createDoFn() method that is responsible for constructing and returning the actual DoFn, you will notice that the only thing that makes this special versus any other DoFn you may write, is that it checks to see if error_handling was specified, constructs an error record from the input records and tags those records using the arbitrary error TupleTag discussed previously. Aside from that, this DoFn is simply taking the input Row, applying the .toUpperCase() method to the field within the Row and tagging all successful records using the arbitrary successful TupleTag also discussed previously.

Building the Transform Catalog JAR

At this point, you should have all the code as defined in the previous section ready to go, and it is now time to build the JAR that will be provided to the Beam YAML pipeline.

From the root directory, run the following command:

mvn package

This will create a JAR under target called xlang-transforms-bundled-1.0-SNAPSHOT.jar that contains the ToUpperCaseTransformProvider along with its dependencies and the external transform service. The external expansion service is what will be invoked by the Beam YAML SDK to import the transform schema and run the expansion service for the transform.

https://github.com/Polber/beam-yaml-xlang/blob/b0806b7ed4f12b30834430d53ee8f57ef3dd3962/pom.xml#L83-L85 Note: The name of the jar is configurable using the finalName tag in the maven-shade-plugin configuration.

Defining the Transform in Beam YAML

Now that you have a JAR file that contains the transform catalog, it is time to include it as part of your Beam YAML pipeline. This is done using providers - these providers allow one to define a suite of transforms in a given JAR or python package that can be used within the Beam YAML pipeline.

We will be utilizing the renaming provider as that allows us to map the Java transform parameters that use Java naming convention to parameters that follow the YAML naming convention. This is especially useful for the ErrorHandling parameter as that is used extensively in the built-in Beam YAML transforms.

For our example, that looks as follows:

providers:
  - type: renaming
    transforms:
      'ToUpperCase': 'ToUpperCase'
    config:
      mappings:
        'ToUpperCase':
          error_handling: 'errorHandling'
        'Identity':
      underlying_provider:
        type: javaJar
        config:
          jar: xlang-transforms-bundled-1.0-SNAPSHOT.jar
        transforms:
          ToUpperCase: "some:urn:to_upper_case:v1"
          Identity: "some:urn:transform_name:v1"

More robust examples of the renaming provider can be found here.

Now, ToUpperCase can be defined as a transform in the Beam YAML pipeline with the single config parameter - field.

A full example:

pipeline:
  type: chain
  transforms:
    - type: Create
      config:
        elements:
          - name: "john"
            id: 1
          - name: "jane"
            id: 2
    - type: Identity
    - type: ToUpperCase
      config:
        field: "name"
    - type: LogForTesting
      
providers:
  - type: renaming
    transforms:
      'ToUpperCase': 'ToUpperCase'
    config:
      mappings:
        'ToUpperCase':
          error_handling: 'errorHandling'
        'Identity':
      underlying_provider:
        type: javaJar
        config:
          jar: xlang-transforms-bundled-1.0-SNAPSHOT.jar
        transforms:
          ToUpperCase: "some:urn:to_upper_case:v1"
          Identity: "some:urn:transform_name:v1"

Expected logs:

message: "{\"name\":\"JOHN\",\"id\":1}"
message: "{\"name\":\"JANE\",\"id\":2}"

Note: Beam YAML will choose the Java implementation of the LogForTesting transform to reduce language switching. The output can get a bit crowded, but look for the logs in the commented “Expected” section at the bottom of the YAML file.

An example with errors caught and handled:

pipeline:
  transforms:
    - type: Create
      config:
        elements:
          - name: "john"
            id: 1
          - name: "jane"
            id: 2
    - type: ToUpperCase
      input: Create
      config:
        field: "unknown"
        error_handling:
          output: errors
    - type: LogForTesting
      input: ToUpperCase
    - type: LogForTesting
      input: ToUpperCase.errors
      
providers:
  - type: renaming
    transforms:
      'ToUpperCase': 'ToUpperCase'
    config:
      mappings:
        'ToUpperCase':
          error_handling: 'errorHandling'
      underlying_provider:
        type: javaJar
        config:
          jar: xlang-transforms-bundled-1.0-SNAPSHOT.jar
        transforms:
          ToUpperCase: "some:urn:to_upper_case:v1"