The Apache Flink Runner can be used to execute Beam pipelines using Apache Flink. For execution you can choose between a cluster execution mode (e.g. Yarn/Kubernetes/Mesos) or a local embedded execution mode which is useful for testing pipelines.
The Flink Runner and Flink are suitable for large scale, continuous jobs, and provide:
It is important to understand that the Flink Runner comes in two flavors:
You may ask why there are two Runners?
Beam and its Runners originally only supported JVM-based languages (e.g. Java/Scala/Kotlin). Python and Go SDKs were added later on. The architecture of the Runners had to be changed significantly to support executing pipelines written in other languages.
If your applications only use Java, then you should currently go with the legacy Runner. Eventually, the portable Runner will replace the legacy Runner because it contains the generalized framework for executing Java, Python, Go, and more languages in the future.
If you want to run Python pipelines with Beam on Flink you want to use the portable Runner. For more information on portability, please visit the [Portability page]({{site.baseurl }}/roadmap/portability/).
Consequently, this guide is split into two parts to document the legacy and the portable functionality of the Flink Runner. Please use the switcher below to select the appropriate Runner:
If you want to use the local execution mode with the Flink Runner you don't have to complete any cluster setup. You can simply run your Beam pipeline. Be sure to set the Runner to FlinkRunner
PortableRunner
.
To use the Flink Runner for executing on a cluster, you have to setup a Flink cluster by following the Flink Setup Quickstart.
The Flink cluster version has to match the minor version used by the FlinkRunner. The minor version is the first two numbers in the version string, e.g. in 1.7.0
the minor version is 1.7
.
We try to track the latest version of Apache Flink at the time of the Beam release. A Flink version is supported by Beam for the time it is supported by the Flink community. The Flink community typially supports the last two minor versions. When support for a Flink version is dropped, it may be deprecated and removed also from Beam, with the exception of Beam LTS releases. LTS releases continue to receive bug fixes for long as the LTS support period.
To find out which version of Flink is compatible with Beam please see the table below:
For retrieving the right Flink version, see the Flink downloads page.
For more information, the Flink Documentation can be helpful.
You must specify your dependency on the Flink Runner in your pom.xml
or build.gradle
. Use the Beam version and the artifact id from the above table. For example:
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-flink-1.6</artifactId> <version>{{ site.release_latest }}</version> </dependency>
pip install apache_beam
$ mvn package -Pflink-runner
Look for the output JAR of this command in the install apache_beam``target` folder.
$ bin/flink -c org.apache.beam.examples.WordCount /path/to/your.jar --runner=FlinkRunner --other-parameters
mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Pflink-runner \ -Dexec.args="--runner=FlinkRunner \ --inputFile=/path/to/pom.xml \ --output=/path/to/counts \ --flinkMaster=<flink master url> \ --filesToStage=target/word-count-beam-bundled-0.1.jar"
1. Only required once: Build the SDK harness container: ./gradlew :sdks:python:container:docker
2. Start the JobService endpoint: ./gradlew :runners:flink:1.5:job-server:runShadow
3. Submit the Python pipeline to the above endpoint by using the PortableRunner
and job_endpoint
set to localhost:8099
(this is the default address of the JobService). For example:
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"]) p = beam.Pipeline(options) .. p.run()
1. Start a Flink cluster which exposes the Rest interface on localhost:8081
by default.
2. Start JobService with Flink Rest endpoint: ./gradlew :runners:flink:1.5:job-server:runShadow -PflinkMasterUrl=localhost:8081
.
3. Submit the pipeline as above.
You can monitor a running Flink job using the Flink JobManager Dashboard or its Rest interfaces. By default, this is available at port 8081
of the JobManager node. If you have a Flink installation on your local machine that would be http://localhost:8081
. Note: When you use the [local]
mode an embedded Flink cluster will be started which does not make a dashboard available.
If your pipeline uses an unbounded data source or sink, the Flink Runner will automatically switch to streaming mode. You can enforce streaming mode by using the streaming
setting mentioned below.
Note: The Runner will print a warning message when unbounded sources are used and checkpointing is not enabled. Many sources like PubSubIO
rely on their checkpoints to be acknowledged which can only be done when checkpointing is enabled for the FlinkRunner
. To enable checkpointing, please set checkpointingInterval
checkpointing_interval
to the desired checkpointing interval in milliseconds.
When executing your pipeline with the Flink Runner, you can set these pipeline options.
See the reference documentation for the [FlinkPipelineOptions](https://beam.apache.org/releases/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/runners/flink/FlinkPipelineOptions.html) PipelineOptions interface (and its subinterfaces) for the complete list of pipeline configuration options.
The [Beam Capability Matrix]({{ site.baseurl }}/documentation/runners/capability-matrix/) documents the capabilities of the legacy Flink Runner.
The Portable Capability Matrix documents the capabilities of the portable Flink Runner.