tree: 9936a4aa1a87797125d976c6faf5a989fb4c4dfa [path history] [tgz]
  1. testing/
  2. transforms/
  3. __init__.py
  4. README.md
  5. wordcount_minimal.yaml
sdks/python/apache_beam/yaml/examples/README.md

Examples Catalog

Prerequistes

Build the expansion service jar required for your YAML code. IO mapping is available in standard_io.yaml, so use this example run command:

cd <PATH_TO_BEAM_REPO>/beam; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:shadowJar

Example Run

This module contains a series of Beam YAML code samples that can be run using the command:

python -m apache_beam.yaml.main --yaml_pipeline_file=/path/to/example.yaml

Depending on the yaml pipeline, the output may be emitted to standard output or a file located in the execution folder used.

Wordcount

A good starting place is the Wordcount example under the root example directory. This example reads in a text file, splits the text on each word, groups by each word, and counts the occurrence of each word. This is a classic example used in the other SDK's and shows off many of the functionalities of Beam YAML.

Testing

A test file is located in the testing folder that will execute all the example yamls and confirm the expected results.

pytest -v testing/

or

pytest -v testing/examples_test.py::JinjaTest

or

python -m unittest -v testing/examples_test.py

Transforms

Examples in this directory show off the various built-in transforms of the Beam YAML framework.

Aggregation

These examples leverage the built-in Combine transform for performing simple aggregations including sum, mean, count, etc.

Blueprints

These examples leverage DF or other existing templates and convert them to yaml blueprints.

Element-wise

These examples leverage the built-in mapping transforms including MapToFields, Filter and Explode. More information can be found about mapping transforms here.

IO

Spanner

Examples Spanner Read and Spanner Write leverage the built-in Spanner_Read and Spanner_Write transforms for performing simple reads and writes from a Google Spanner database.

Kafka

Examples involving Kafka such as Kafka Read Write require users to set up a Kafka cluster that Dataflow runner executing the Beam pipeline has access to. Please note that ReadFromKafka transform has a known issue when using non-Dataflow portable runners where reading may get stuck in streaming pipelines. Hence using the Dataflow runner is recommended for examples that involve reading from Kafka in a streaming pipeline.

See here for general instructions on setting up a Kafka cluster. An option is to use Click to Deploy to quickly launch a Kafka cluster on GCE. SASL/PLAIN authentication mechanism is configured for the brokers as part of the deployment. See also here for an alternative step-by-step guide on setting up Kafka on GCE without the authentication mechanism.

Let‘s assume one of the bootstrap servers is on VM instance kafka-vm-0 with the internal IP address 123.45.67.89 and port 9092 that the bootstrap server is listening on. SASL/PLAIN USERNAME and PASSWORD can be viewed from the VM instance’s metadata on the GCE console, or with gcloud CLI:

gcloud compute instances describe kafka-vm-0 \
  --format='value[](metadata.items.kafka-user)'
gcloud compute instances describe kafka-vm-0 \
  --format='value[](metadata.items.kafka-password)'

Beam pipeline Kafka Read Write first writes data to the Kafka topic using the WriteToKafka transform and then reads that data back using the ReadFromKafka transform. Run the pipeline:

export PROJECT="$(gcloud config get-value project)"
export TEMP_LOCATION="gs://MY-BUCKET/tmp"
export REGION="us-central1"
export JOB_NAME="demo-kafka-`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="1"

python -m apache_beam.yaml.main \
  --yaml_pipeline_file transforms/io/kafka.yaml \
  --runner DataflowRunner \
  --temp_location $TEMP_LOCATION \
  --project $PROJECT \
  --region $REGION \
  --num_workers $NUM_WORKERS \
  --job_name $JOB_NAME \
  --jinja_variables '{ "BOOTSTRAP_SERVERS": "123.45.67.89:9092",
    "TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'\
  --sdk_location container  \
  --sdk_harness_container_image_overrides ".*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java11_sdk:latest"

Optional: If Kafka cluster is set up with no SASL/PLAINTEXT authentication configured for the brokers, there's no SASL/PLAIN USERNAME and PASSWORD needed. In the pipelines, omit the configurations producer_config_updates and consumer_config from the WriteToKafka and ReadFromKafka transforms. Run the commands above without specifying the username and password in --jinja_variables flag.

Iceberg

Beam pipelines Iceberg Write and Iceberg Read are examples of how to interact with Iceberg tables on GCS storage and with Hadoop catalog configured.

To create a GCS bucket as our warehouse storage, see here. To run the pipelines locally, an option is to create a service account key in order to access GCS (see here). Within the pipelines, specify GCS bucket name and the path to the saved service account key .json file.

Note: With Hadoop catalog, Iceberg will use Hadoop connector for GCS. See here for full list of configuration options for Hadoop catalog when use with GCS.

To create and write to Iceberg tables on GCS, run:

python -m apache_beam.yaml.main \
  --yaml_pipeline_file transforms/io/iceberg_write.yaml

The pipeline uses Dynamic destinations write to dynamically create and select a table destination based on field values in the incoming records.

To read from a created Iceberg table on GCS, run:

python -m apache_beam.yaml.main \
  --yaml_pipeline_file transforms/io/iceberg_read.yaml

Optional: To run the pipeline on Dataflow, service account key is not needed. Omit the authentication settings in the Hadoop catalog configuration config_properties, and run:

export REGION="us-central1"
export JOB_NAME="demo-iceberg_write-`date +%Y%m%d-%H%M%S`"

gcloud dataflow yaml run $JOB_NAME \
  --yaml-pipeline-file transforms/io/iceberg_write.yaml \
  --region $REGION
export REGION="us-central1"
export JOB_NAME="demo-iceberg_read-`date +%Y%m%d-%H%M%S`"

gcloud dataflow yaml run $JOB_NAME \
  --yaml-pipeline-file transforms/io/iceberg_read.yaml \
  --region $REGION

Jinja

Jinja templatization can be used to build off of different contexts and/or with different configurations.

Several examples will be created based on the already used word count example by leveraging Jinja templating engine for dynamic pipeline generation based on inputs from the user through % include, % import, and inheritance directives.

Jinja % import directive:

Jinja % include directive:

ML

Examples that include the built-in Enrichment transform for performing ML enrichments:

Examples that include ML-specific transforms such as RunInference and MLTransform:

More information can be found about aggregation transforms here.