Build the expansion service jar required for your YAML code. IO mapping is available in standard_io.yaml, so use this example run command:
cd <PATH_TO_BEAM_REPO>/beam; ./gradlew sdks:java:io:google-cloud-platform:expansion-service:shadowJar
This module contains a series of Beam YAML code samples that can be run using the command:
python -m apache_beam.yaml.main --yaml_pipeline_file=/path/to/example.yaml
Depending on the yaml pipeline, the output may be emitted to standard output or a file located in the execution folder used.
A good starting place is the Wordcount example under the root example directory. This example reads in a text file, splits the text on each word, groups by each word, and counts the occurrence of each word. This is a classic example used in the other SDK's and shows off many of the functionalities of Beam YAML.
A test file is located in the testing folder that will execute all the example yamls and confirm the expected results.
pytest -v testing/ or pytest -v testing/examples_test.py::JinjaTest or python -m unittest -v testing/examples_test.py
Examples in this directory show off the various built-in transforms of the Beam YAML framework.
These examples leverage the built-in Combine
transform for performing simple aggregations including sum, mean, count, etc.
These examples leverage DF or other existing templates and convert them to yaml blueprints.
These examples leverage the built-in mapping transforms including MapToFields
, Filter
and Explode
. More information can be found about mapping transforms here.
Examples Spanner Read and Spanner Write leverage the built-in Spanner_Read
and Spanner_Write
transforms for performing simple reads and writes from a Google Spanner database.
Examples involving Kafka such as Kafka Read Write require users to set up a Kafka cluster that Dataflow runner executing the Beam pipeline has access to. Please note that ReadFromKafka
transform has a known issue when using non-Dataflow portable runners where reading may get stuck in streaming pipelines. Hence using the Dataflow runner is recommended for examples that involve reading from Kafka in a streaming pipeline.
See here for general instructions on setting up a Kafka cluster. An option is to use Click to Deploy to quickly launch a Kafka cluster on GCE. SASL/PLAIN authentication mechanism is configured for the brokers as part of the deployment. See also here for an alternative step-by-step guide on setting up Kafka on GCE without the authentication mechanism.
Let‘s assume one of the bootstrap servers is on VM instance kafka-vm-0
with the internal IP address 123.45.67.89
and port 9092
that the bootstrap server is listening on. SASL/PLAIN USERNAME
and PASSWORD
can be viewed from the VM instance’s metadata on the GCE console, or with gcloud CLI:
gcloud compute instances describe kafka-vm-0 \ --format='value[](metadata.items.kafka-user)' gcloud compute instances describe kafka-vm-0 \ --format='value[](metadata.items.kafka-password)'
Beam pipeline Kafka Read Write first writes data to the Kafka topic using the WriteToKafka
transform and then reads that data back using the ReadFromKafka
transform. Run the pipeline:
export PROJECT="$(gcloud config get-value project)" export TEMP_LOCATION="gs://MY-BUCKET/tmp" export REGION="us-central1" export JOB_NAME="demo-kafka-`date +%Y%m%d-%H%M%S`" export NUM_WORKERS="1" python -m apache_beam.yaml.main \ --yaml_pipeline_file transforms/io/kafka.yaml \ --runner DataflowRunner \ --temp_location $TEMP_LOCATION \ --project $PROJECT \ --region $REGION \ --num_workers $NUM_WORKERS \ --job_name $JOB_NAME \ --jinja_variables '{ "BOOTSTRAP_SERVERS": "123.45.67.89:9092", "TOPIC": "MY-TOPIC", "USERNAME": "USERNAME", "PASSWORD": "PASSWORD" }'\ --sdk_location container \ --sdk_harness_container_image_overrides ".*java.*,gcr.io/apache-beam-testing/beam-sdk/beam_java11_sdk:latest"
Optional: If Kafka cluster is set up with no SASL/PLAINTEXT authentication configured for the brokers, there's no SASL/PLAIN USERNAME
and PASSWORD
needed. In the pipelines, omit the configurations producer_config_updates
and consumer_config
from the WriteToKafka
and ReadFromKafka
transforms. Run the commands above without specifying the username and password in --jinja_variables
flag.
Beam pipelines Iceberg Write and Iceberg Read are examples of how to interact with Iceberg tables on GCS storage and with Hadoop catalog configured.
To create a GCS bucket as our warehouse storage, see here. To run the pipelines locally, an option is to create a service account key in order to access GCS (see here). Within the pipelines, specify GCS bucket name and the path to the saved service account key .json file.
Note: With Hadoop catalog, Iceberg will use Hadoop connector for GCS. See here for full list of configuration options for Hadoop catalog when use with GCS.
To create and write to Iceberg tables on GCS, run:
python -m apache_beam.yaml.main \ --yaml_pipeline_file transforms/io/iceberg_write.yaml
The pipeline uses Dynamic destinations write to dynamically create and select a table destination based on field values in the incoming records.
To read from a created Iceberg table on GCS, run:
python -m apache_beam.yaml.main \ --yaml_pipeline_file transforms/io/iceberg_read.yaml
Optional: To run the pipeline on Dataflow, service account key is not needed. Omit the authentication settings in the Hadoop catalog configuration config_properties
, and run:
export REGION="us-central1" export JOB_NAME="demo-iceberg_write-`date +%Y%m%d-%H%M%S`" gcloud dataflow yaml run $JOB_NAME \ --yaml-pipeline-file transforms/io/iceberg_write.yaml \ --region $REGION
export REGION="us-central1" export JOB_NAME="demo-iceberg_read-`date +%Y%m%d-%H%M%S`" gcloud dataflow yaml run $JOB_NAME \ --yaml-pipeline-file transforms/io/iceberg_read.yaml \ --region $REGION
Jinja templatization can be used to build off of different contexts and/or with different configurations.
Several examples will be created based on the already used word count example by leveraging Jinja templating engine for dynamic pipeline generation based on inputs from the user through % include
, % import
, and inheritance directives.
Jinja % import
directive:
Jinja % include
directive:
Examples that include the built-in Enrichment
transform for performing ML enrichments:
Examples that include ML-specific transforms such as RunInference
and MLTransform
:
More information can be found about aggregation transforms here.