This project provides examples of Apache Beam multi-language pipelines:
Count.perElement()
transform.JavaExternalTransform
API.<version>
and <port>
with valid values: java -jar beam-examples-multi-language-<version>.jar <port> --javaClassLookupAllowlistFile='*'
In a new shell, run a pipeline in the python directory using a Beam runner that supports multi-language pipelines.
The Python files contain details about the actual commands to run.
Performs image classification on handwritten digits from the MNIST database.
Please see here for context and information regarding the corresponding Python pipeline.
Please note that the Java pipeline is availalble in the Beam Java examples module.
Obtain/generate a csv input file that contains labels and pixels to feed into the model and store it in GCS. An example input is available here.
Create a model file that contains the pickled file of a scikit-learn model trained on MNIST data and store it in GCS. An example model file is available here. This model was generated by by running the program given here on the example input dataset.
Perform Beam runner specific setup according to instructions here.
Following instructions are for running the pipeline with the Dataflow runner. For other portable runners, please modify the instructions according to the guidelines here
export BEAM_VERSION=<Beam version> mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=$BEAM_VERSION \ -DgroupId=org.example \ -DartifactId=multi-language-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
export GCP_PROJECT=<GCP project> export GCP_BUCKET=<GCP bucket> export GCP_REGION=<GCP region> mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.multilanguage.SklearnMnistClassification \ -Dexec.args="--runner=DataflowRunner --project=$GCP_PROJECT \ --region=$GCP_REGION \ --gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \ --output=gs://$GCP_BUCKET/multi-language-beam/output" \ -Pdataflow-runner
gsutil cat gs://$GCP_BUCKET/multi-language-beam/output*
Activate a new virtual environment following these instructions.
sklearn
package.pip install apache-beam[gcp] pip install sklearn
python -m apache_beam.runners.portability.expansion_service_main -p <PORT> --fully_qualified_name_glob "*"
Make sure that Docker is installed and available on your system.
In a different shell, build and push Python and Java Docker containers.
export DOCKER_ROOT=<Docker root> ./gradlew :sdks:python:container:py38:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest docker push $DOCKER_ROOT/beam_python3.8_sdk:latest ./gradlew :sdks:java:container:java11:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest -Pjava11Home=$JAVA_HOME docker push $DOCKER_ROOT/beam_java11_sdk:latest
export GCP_PROJECT=<GCP project> export GCP_BUCKET=<GCP bucket> export GCP_REGION=<GCP region> export EXPANSION_SERVICE_PORT=<PORT> # This removes any existing output. gsutil rm gs://$GCP_BUCKET/multi-language-beam/output* ./gradlew :examples:multi-language:sklearnMinstClassification --args=" \ --runner=DataflowRunner \ --project=$GCP_PROJECT \ --gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \ --output=gs://$GCP_BUCKET/multi-language-beam/output \ --sdkContainerImage=$DOCKER_ROOT/beam_java11_sdk:latest \ --sdkHarnessContainerImageOverrides=.*python.*,$DOCKER_ROOT/beam_python3.8_sdk:latest \ --expansionService=localhost:$EXPANSION_SERVICE_PORT \ --region=${GCP_REGION}"
gsutil cat gs://$GCP_BUCKET/multi-language-beam/output*
This example is covered in the Java multi-language pipelines quickstart. The pipeline source code is available at PythonDataframeWordCount.java.