The Apache Spark Runner can be used to execute Beam pipelines using Apache Spark. The Spark Runner can execute Spark pipelines just like a native Spark application; deploying a self-contained application for local mode, running on Spark's Standalone RM, or using YARN or Mesos.
The Spark Runner executes Beam pipelines on top of Apache Spark, providing:
The Beam Capability Matrix documents the currently supported capabilities of the Spark Runner.
The Spark runner comes in three flavors:
Note: It is still experimental, its coverage of the Beam model is partial. As for now it only supports batch mode.
This guide is split into two parts to document the non-portable and the portable functionality of the Spark Runner. Please use the switcher below to select the appropriate Runner:
Beam and its Runners originally only supported JVM-based languages (e.g. Java/Scala/Kotlin). Python and Go SDKs were added later on. The architecture of the Runners had to be changed significantly to support executing pipelines written in other languages.
If your applications only use Java, then you should currently go with one of the java based runners. If you want to run Python or Go pipelines with Beam on Spark, you need to use the portable Runner. For more information on portability, please visit the Portability page.
The Spark runner currently supports Spark's 3.1.x branch.
Note: Support for Spark 2.4.x is deprecated and will be dropped with the release of Beam 2.44.0 (or soon after).
{{< paragraph class=“language-java” >}} You can add a dependency on the latest version of the Spark runner by adding to your pom.xml the following: {{< /paragraph >}}
{{< highlight java >}} org.apache.beam beam-runners-spark-3 {{< param release_latest >}} {{< /highlight >}}
{{< paragraph class=“language-java” >}} In some cases, such as running in local mode/Standalone, your (self-contained) application would be required to pack Spark by explicitly adding the following dependencies in your pom.xml: {{< /paragraph >}}
{{< highlight java >}} org.apache.spark spark-core_2.12 ${spark.version}
{{< paragraph class=“language-java” >}} And shading the application jar using the maven shade plugin: {{< /paragraph >}}
{{< highlight java >}} org.apache.maven.plugins maven-shade-plugin false : META-INF/.SF META-INF/.DSA META-INF/*.RSA package shade true shaded {{< /highlight >}}
{{< paragraph class=“language-java” >}} After running mvn package, run ls target and you should see (assuming your artifactId is beam-examples
and the version is 1.0.0
): {{< /paragraph >}}
{{< highlight java >}} beam-examples-1.0.0-shaded.jar {{< /highlight >}}
{{< paragraph class=“language-java” >}} To run against a Standalone cluster simply run: {{< /paragraph >}}
{{< paragraph class=“language-java” >}}
For RDD/DStream based runner:
{{< /paragraph >}}
{{< highlight java >}} spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkRunner {{< /highlight >}}
{{< paragraph class=“language-java” >}}
For Structured Streaming based runner:
{{< /paragraph >}}
{{< highlight java >}} spark-submit --class com.beam.examples.BeamPipeline --master spark://HOST:PORT target/beam-examples-1.0.0-shaded.jar --runner=SparkStructuredStreamingRunner {{< /highlight >}}
{{< paragraph class=“language-py” >}} You will need Docker to be installed in your execution environment. To develop Apache Beam with Python you have to install the Apache Beam Python SDK: pip install apache_beam
. Please refer to the Python documentation on how to create a Python pipeline. {{< /paragraph >}}
{{< highlight py >}} pip install apache_beam {{< /highlight >}}
{{< paragraph class=“language-py” >}} Starting from Beam 2.20.0, pre-built Spark Job Service Docker images are available at Docker Hub. {{< /paragraph >}}
{{< paragraph class=“language-py” >}} For older Beam versions, you will need a copy of Apache Beam's source code. You can download it on the Downloads page. {{< /paragraph >}}
{{< paragraph class=“language-py” >}}
docker run --net=host apache/beam_spark_job_server:latest
./gradlew :runners:spark:3:job-server:runShadow
{{< /paragraph >}}{{< paragraph class=“language-py” >}} The JobService is the central instance where you submit your Beam pipeline. The JobService will create a Spark job for the pipeline and execute the job. To execute the job on a Spark cluster, the Beam JobService needs to be provided with the Spark master address. {{< /paragraph >}}
{{< paragraph class=“language-py” >}}2. Submit the Python pipeline to the above endpoint by using the PortableRunner
, job_endpoint
set to localhost:8099
(this is the default address of the JobService), and environment_type
set to LOOPBACK
. For example:{{< /paragraph >}}
{{< highlight py >}} import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions([ “--runner=PortableRunner”, “--job_endpoint=localhost:8099”, “--environment_type=LOOPBACK” ]) with beam.Pipeline(options) as p: ... {{< /highlight >}}
Deploying your Beam pipeline on a cluster that already has a Spark deployment (Spark classes are available in container classpath) does not require any additional dependencies. For more details on the different deployment modes see: Standalone, YARN, or Mesos.
{{< paragraph class=“language-py” >}}1. Start a Spark cluster which exposes the master on port 7077 by default.{{< /paragraph >}}
{{< paragraph class=“language-py” >}} 2. Start JobService that will connect with the Spark master: * with Docker (preferred): docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://localhost:7077
* or from Beam source code: ./gradlew :runners:spark:3:job-server:runShadow -PsparkMasterUrl=spark://localhost:7077
{{< /paragraph >}}
{{< paragraph class=“language-py” >}}3. Submit the pipeline as above. Note however that environment_type=LOOPBACK
is only intended for local testing. See here for details.{{< /paragraph >}}
{{< paragraph class=“language-py” >}} (Note that, depending on your cluster setup, you may need to change the environment_type
option. See here for details.) {{< /paragraph >}}
To run Beam jobs written in Python, Go, and other supported languages, you can use the SparkRunner
and PortableRunner
as described on the Beam's Spark Runner page (also see Portability Framework Roadmap).
The following example runs a portable Beam job in Python from the Dataproc cluster's master node with Yarn backed.
Note: This example executes successfully with Dataproc 2.0, Spark 3.1.2 and Beam 2.37.0.
--optional-components
: Docker.--image-version
: the cluster's image version, which determines the Spark version installed on the cluster (for example, see the Apache Spark component versions listed for the latest and previous four 2.0.x image release versions).--region
: a supported Dataproc region.--enable-component-gateway
: enable access to web interfaces.--scopes
: enable API access to GCP services in the same project.--properties
: add specific configuration for some component, here spark.master.rest is enabled to use job submit to the cluster.--runner
(required): SparkRunner
.--output_executable_path
(required): path for the bundle jar to be created.--output
(required): where output shall be written.--spark_version
(optional): select spark version 2 (default) or 3.--cluster
: name of created Dataproc cluster.--region
: a supported Dataproc region.--class
: the entry point for your application.--jars
: path to the bundled jar including your application and all dependencies.When executing your pipeline with the Spark Runner, you should consider the following pipeline options.
{{< paragraph class=“language-java” >}}
For RDD/DStream based runner:
{{< /paragraph >}}
{{< paragraph class=“language-java” >}}
For Structured Streaming based runner:
{{< /paragraph >}}
When submitting a Spark application to cluster, it is common (and recommended) to use the spark-submit script that is provided with the spark installation. The PipelineOptions described above are not to replace spark-submit, but to complement it. Passing any of the above mentioned options could be done as one of the application-arguments, and setting --master takes precedence. For more on how to generally use spark-submit checkout Spark documentation.
You can monitor a running Spark job using the Spark Web Interfaces. By default, this is available at port 4040
on the driver node. If you run Spark on your local machine that would be http://localhost:4040
. Spark also has a history server to view after the fact. {{< paragraph class=“language-java” >}} Metrics are also available via REST API. Spark provides a metrics system that allows reporting Spark metrics to a variety of Sinks. The Spark runner reports user-defined Beam Aggregators using this same metrics system and currently supports [GraphiteSink](https://beam.apache.org/releases/javadoc/{{< param release_latest >}}/org/apache/beam/runners/spark/metrics/sink/GraphiteSink.html) and [CSVSink](https://beam.apache.org/releases/javadoc/{{< param release_latest >}}/org/apache/beam/runners/spark/metrics/sink/CsvSink.html). Providing support for additional Sinks supported by Spark is easy and straight-forward. {{< /paragraph >}} {{< paragraph class=“language-py” >}}Spark metrics are not yet supported on the portable runner.{{< /paragraph >}}
{{< paragraph class=“language-java” >}}
For RDD/DStream based runner:
If your pipeline uses an UnboundedSource the Spark Runner will automatically set streaming mode. Forcing streaming mode is mostly used for testing and is not recommended.
For Structured Streaming based runner:
Streaming mode is not implemented yet in the Spark Structured Streaming runner. {{< /paragraph >}}
{{< paragraph class=“language-py” >}} Streaming is not yet supported on the Spark portable runner. {{< /paragraph >}}
{{< paragraph class=“language-java” >}}
For RDD/DStream based runner:
If you would like to execute your Spark job with a provided SparkContext, such as when using the spark-jobserver, or use StreamingListeners, you can't use SparkPipelineOptions (the context or a listener cannot be passed as a command-line argument anyway). Instead, you should use SparkContextOptions which can only be used programmatically and is not a common PipelineOptions implementation.
For Structured Streaming based runner:
Provided SparkSession and StreamingListeners are not supported on the Spark Structured Streaming runner {{< /paragraph >}}
{{< paragraph class=“language-py” >}} Provided SparkContext and StreamingListeners are not supported on the Spark portable runner. {{< /paragraph >}}
An example of configuring Spark to run Apache beam job