This example reads from the Google Cloud Pub/Sub NYC Taxi stream described here, writes to a given Kafka topic, and reads back from the same Kafka topic. This example uses cross-language transforms available in kafka.py. Transforms are implemented in Java and are available here.
Install Java Development kit (JDK) version 8 in your system and make sure that JAVA_HOME
environment variable points to your JDK installation. Make sure that java
command is available in the environment.
java -version <Should print information regarding the installed Java version>
This example requires users to setup a Kafka cluster that the Beam runner executing the pipeline has access to.
See here for general instructions on setting up a Kafka cluster. One option is to setup the Kafka cluster in GCE. See here for step by step instructions on setting up a single node Kafka cluster in GCE. When using Dataflow consider starting the Kafka cluster in the region where Dataflow pipeline will be running. See here for more details regarding the selecting a GCP region for Dataflow.
Let's assume that that IP address of one of the bootstrap servers of the Kafka cluster to be 123.45.67.89:123
and the port to be 9092
.
export BOOTSTRAP_SERVER="123.45.67.89:123:9092"
Perform Beam runner specific setup.
ℹ️ Note that cross-language transforms require portable implementations of Spark/Flink/Direct runners. Dataflow requires runner V2. See here for instructions for setting up Dataflow.
Setup a virtual environment for running Beam Python programs. See here for prerequisites. Dataflow requires the gcp
tag when installing Beam.
python -m venv env source env/bin/activate pip install 'apache-beam[gcp]'
Run the Beam pipeline. You can either use the default Kafka topic name or specify a Kafka topic name. Following command assumes Dataflow. See here for instructions on running Beam Python programs on other runners.
ℹ️ Note that this exemple is not available in Beam versions before 2.24.0 hence you'll have to either get the example program from Beam or follow steps provided in the section Running the Example from a Beam Git Clone.
export PROJECT="$(gcloud config get-value project)" export TEMP_LOCATION="gs://MY-BUCKET/temp" export REGION="us-central1" export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`" export NUM_WORKERS="5" python -m apache_beam.examples.kafkataxi.kafka_taxi \ --runner DataflowRunner \ --temp_location $TEMP_LOCATION \ --project $PROJECT \ --region $REGION \ --num_workers $NUM_WORKERS \ --job_name $JOB_NAME \ --bootstrap_servers $BOOTSTRAP_SERVER \ --experiments=use_runner_v2
Running this example from a Beam Git clone requires some additional steps.
Checkout a clone of the Beam Git repo. See here for prerequisites.
Assume your Github username to be GITHUB_USERNAME
.
git clone git@github.com:${GITHUB_USERNAME}/beam cd beam
Build IO expansion service jar.
./gradlew :sdks:java:io:expansion-service:build
Push a java SDK Harness container to Docker Hub. See here for prerequisites and additional information.
export DOCKER_ROOT="Your Docker Repository Root" ./gradlew :sdks:java:container:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest docker push $DOCKER_ROOT/beam_java_sdk:latest
For portable Flink/Spark in local mode, instead of above command just build the Java SDK harness container locally using the default values for repository root and the docker tag.
Activate your Python virtual environment. This example uses venv
. See here for instructions regarding setting up other types of Python virtual environments.
cd .. # Creating the virtual environment in the top level work directory. python -m venv env source env/bin/activate
Install Beam and dependencies and build a Beam distribution.
cd beam/sdks/python pip install -r build-requirements.txt pip install -e '.[gcp]' python setup.py sdist
Run the Beam pipeline. You can either use the default Kafka topic name or specify a Kafka topic name. Following command assumes Dataflow. See here for instructions on running Beam Python programs on other runners.
export PROJECT="$(gcloud config get-value project)" export TEMP_LOCATION="gs://MY-BUCKET/temp" export REGION="us-central1" export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`" export NUM_WORKERS="5" export PYTHON_DISTRIBUTION="dist/'Name of Python distribution'" python -m apache_beam.examples.kafkataxi.kafka_taxi \ --runner DataflowRunner \ --temp_location $TEMP_LOCATION \ --project $PROJECT \ --region $REGION \ --sdk_location $PYTHON_DISTRIBUTION \ --num_workers $NUM_WORKERS \ --job_name $JOB_NAME \ --bootstrap_servers $BOOTSTRAP_SERVER \ --sdk_harness_container_image_overrides ".*java.*,${DOCKER_ROOT}/beam_java_sdk:latest" \ --experiments=use_runner_v2