blob: 5319b6a556a6d4dbb904343e88ea0b295207d850 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Example multi-language pipelines
This project provides examples of Apache Beam
[multi-language pipelines](https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines):
## Using Java transforms from Python
* **python/addprefix** - A Python pipeline that reads a text file and attaches a prefix on the Java side to each input.
* **python/javacount** - A Python pipeline that counts words using the Java `Count.perElement()` transform.
* **python/javadatagenerator** - A Python pipeline that produces a set of strings generated from Java.
This example demonstrates the `JavaExternalTransform` API.
### Instructions for running the pipelines
#### 1) Start the expansion service
1. Download the latest 'beam-examples-multi-language' JAR. Starting with Apache Beam 2.36.0,
you can find it in [the Maven Central Repository](https://search.maven.org/search?q=g:org.apache.beam).
2. Run the following command, replacing `<version>` and `<port>` with valid values:
`java -jar beam-examples-multi-language-<version>.jar <port> --javaClassLookupAllowlistFile='*'`
#### 2) Set up a Python virtual environment for Beam
1. See [the Python quickstart](https://beam.apache.org/get-started/quickstart-py/)
for more information.
#### 3) Execute the Python pipeline
1. In a new shell, run a pipeline in the **python** directory using a Beam runner that supports
multi-language pipelines.
The Python files contain details about the actual commands to run.
## Using Python transforms from Java
### Sklearn Mnist Classification
Performs image classification on handwritten digits from the [MNIST](https://en.wikipedia.org/wiki/MNIST_database)
database.
Please see [here](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/inference) for
context and information regarding the corresponding Python pipeline.
Please note that the Java pipeline is
[availalble in the Beam Java examples module](https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/multilanguage/SklearnMnistClassification.java).
#### Setup
* Obtain/generate a csv input file that contains labels and pixels to feed into the model and store it in
GCS. An example input is available
[here](https://storage.googleapis.com/apache-beam-samples/multi-language/mnist/example_input.csv).
* Create a model file that contains the pickled file of a scikit-learn model
trained on MNIST data and store it in GCS. An example model file is available
[here](https://storage.googleapis.com/apache-beam-samples/multi-language/mnist/example_model).
This model was generated by by running the program given
[here](https://python-course.eu/machine-learning/training-and-testing-with-mnist.php)
on the
[example input dataset](https://storage.googleapis.com/apache-beam-samples/multi-language/mnist/example_input.csv).
* Perform Beam runner specific setup according to instructions
[here](https://beam.apache.org/get-started/quickstart-java/#run-a-pipeline).
Following instructions are for running the pipeline with the Dataflow runner. For other portable runners,
please modify the instructions according to the guidelines
[here](https://beam.apache.org/documentation/sdks/java-multi-language-pipelines/#run-with-directrunner)
#### Instructions for running the Java pipeline on released Beam (Beam 2.43.0 and later).
* Checkout the Beam examples Maven archetype for the relevant Beam version.
```
export BEAM_VERSION=<Beam version>
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=$BEAM_VERSION \
-DgroupId=org.example \
-DartifactId=multi-language-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false
```
* Run the pipeline.
```
export GCP_PROJECT=<GCP project>
export GCP_BUCKET=<GCP bucket>
export GCP_REGION=<GCP region>
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.multilanguage.SklearnMnistClassification \
-Dexec.args="--runner=DataflowRunner --project=$GCP_PROJECT \
--region=$GCP_REGION \
--gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
--output=gs://$GCP_BUCKET/multi-language-beam/output" \
-Pdataflow-runner
```
* Inspect the output. Each line has data separated by a comma ",". The first item is the actual label of
the digit. The second item is the predicted label of the digit.
```
gsutil cat gs://$GCP_BUCKET/multi-language-beam/output*
```
#### Instructions for running the Java pipeline at HEAD (Beam 2.41.0 and 2.42.0).
* Activate a new virtual environment following
[these instructions](https://beam.apache.org/get-started/quickstart-py/#create-and-activate-a-virtual-environment).
* 2. Install Apache Beam package with gcp support and the `sklearn` package.
```
pip install apache-beam[gcp]
pip install sklearn
```
* Startup the expansion service
```
python -m apache_beam.runners.portability.expansion_service_main -p <PORT> --fully_qualified_name_glob "*"
```
* Make sure that Docker is installed and available on your system.
* In a different shell, build and push Python and Java Docker containers.
```
export DOCKER_ROOT=<Docker root>
./gradlew :sdks:python:container:py38:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest
docker push $DOCKER_ROOT/beam_python3.8_sdk:latest
./gradlew :sdks:java:container:java11:docker -Pdocker-repository-root=$DOCKER_ROOT -Pdocker-tag=latest -Pjava11Home=$JAVA_HOME
docker push $DOCKER_ROOT/beam_java11_sdk:latest
```
* Run the pipeline using the following Gradle command (this guide assumes Dataflow runner).
Note that we override both the Java and Python SDK harness containers here.
```
export GCP_PROJECT=<GCP project>
export GCP_BUCKET=<GCP bucket>
export GCP_REGION=<GCP region>
export EXPANSION_SERVICE_PORT=<PORT>
# This removes any existing output.
gsutil rm gs://$GCP_BUCKET/multi-language-beam/output*
./gradlew :examples:multi-language:sklearnMinstClassification --args=" \
--runner=DataflowRunner \
--project=$GCP_PROJECT \
--gcpTempLocation=gs://$GCP_BUCKET/multi-language-beam/tmp \
--output=gs://$GCP_BUCKET/multi-language-beam/output \
--sdkContainerImage=$DOCKER_ROOT/beam_java11_sdk:latest \
--sdkHarnessContainerImageOverrides=.*python.*,$DOCKER_ROOT/beam_python3.8_sdk:latest \
--expansionService=localhost:$EXPANSION_SERVICE_PORT \
--region=${GCP_REGION}"
```
* Inspect the output. Each line has data separated by a comma ",". The first item is the actual label
of the digit. The second item is the predicted label of the digit.
```
gsutil cat gs://$GCP_BUCKET/multi-language-beam/output*
```
### Python Dataframe Wordcount
This example is covered in the [Java multi-language pipelines quickstart](https://beam.apache.org/documentation/sdks/java-multi-language-pipelines/).
The pipeline source code is available at
[PythonDataframeWordCount.java](https://github.com/apache/beam/tree/master/examples/java/src/main/java/org/apache/beam/examples/multilanguage/PythonDataframeWordCount.java).