The Apache Spark Runner can be used to execute Beam pipelines using Apache Spark. The Spark Runner can execute Spark pipelines just like a native Spark application; deploying a self-contained application for local mode, running on Spark's Standalone RM, or using YARN or Mesos.
The Spark Runner executes Beam pipelines on top of Apache Spark, providing:
The [Beam Capability Matrix]({{ site.baseurl }}/documentation/runners/capability-matrix/) documents the currently supported capabilities of the Spark Runner.
Note: support for the Beam Model in streaming is currently experimental, follow-up in the [mailing list]({{ site.baseurl }}/get-started/support/) for status updates.
The Spark runner comes in two flavors:
Beam and its Runners originally only supported JVM-based languages (e.g. Java/Scala/Kotlin). Python and Go SDKs were added later on. The architecture of the Runners had to be changed significantly to support executing pipelines written in other languages.
If your applications only use Java, then you should currently go with the legacy Runner. If you want to run Python or Go pipelines with Beam on Spark, you need to use the portable Runner. For more information on portability, please visit the [Portability page]({{site.baseurl }}/roadmap/portability/).
This guide is split into two parts to document the legacy and the portable functionality of the Spark Runner. Please use the switcher below to select the appropriate Runner:
The Spark runner currently supports Spark's 2.x branch, and more specifically any version greater than 2.2.0.
You can add a dependency on the latest version of the Spark runner by adding to your pom.xml the following:
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-spark</artifactId> <version>{{ site.release_latest }}</version> </dependency>
In some cases, such as running in local mode/Standalone, your (self-contained) application would be required to pack Spark by explicitly adding the following dependencies in your pom.xml:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> </dependency>
And shading the application jar using the maven shade plugin:
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <configuration> <createDependencyReducedPom>false</createDependencyReducedPom> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <shadedArtifactAttached>true</shadedArtifactAttached> <shadedClassifierName>shaded</shadedClassifierName> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> </configuration> </execution> </executions> </plugin>
After running mvn package, run ls target and you should see (assuming your artifactId is beam-examples
and the version is 1.0.0
):
To run against a Standalone cluster simply run:
pip install apache_beam
1. Start the JobService endpoint: ./gradlew :runners:spark:job-server:runShadow
2. Submit the Python pipeline to the above endpoint by using the PortableRunner
, job_endpoint
set to localhost:8099
(this is the default address of the JobService), and environment_type
set to LOOPBACK
. For example:
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions options = PipelineOptions([ "--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=LOOPBACK" ]) with beam.Pipeline(options) as p: ...
Deploying your Beam pipeline on a cluster that already has a Spark deployment (Spark classes are available in container classpath) does not require any additional dependencies. For more details on the different deployment modes see: Standalone, YARN, or Mesos.
1. Start a Spark cluster which exposes the master on port 7077 by default.
2. Start JobService that will connect with the Spark master: ./gradlew :runners:spark:job-server:runShadow -PsparkMasterUrl=spark://localhost:7077
.
3. Submit the pipeline as above. Note however that environment_type=LOOPBACK
is only intended for local testing. See [here]({{ site.baseurl }}/roadmap/portability/#sdk-harness-config) for details.
When executing your pipeline with the Spark Runner, you should consider the following pipeline options.
When submitting a Spark application to cluster, it is common (and recommended) to use the spark-submit script that is provided with the spark installation. The PipelineOptions described above are not to replace spark-submit, but to complement it. Passing any of the above mentioned options could be done as one of the application-arguments, and setting --master takes precedence. For more on how to generally use spark-submit checkout Spark documentation.
You can monitor a running Spark job using the Spark Web Interfaces. By default, this is available at port 4040
on the driver node. If you run Spark on your local machine that would be http://localhost:4040
. Spark also has a history server to view after the fact. Metrics are also available via REST API. Spark provides a metrics system that allows reporting Spark metrics to a variety of Sinks. The Spark runner reports user-defined Beam Aggregators using this same metrics system and currently supports GraphiteSink and CSVSink, and providing support for additional Sinks supported by Spark is easy and straight-forward. Spark metrics are not yet supported on the portable runner.