blob: 808129f5301008ab00b96d9c12c83c035027c135 [file] [log] [blame]
////
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
[[sparkgraphcomputer]]
==== SparkGraphComputer
[source,xml]
----
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>spark-gremlin</artifactId>
<version>x.y.z</version>
</dependency>
----
image:spark-logo.png[width=175,float=left] link:http://spark.apache.org[Spark] is an Apache Software Foundation
project focused on general-purpose OLAP data processing. Spark provides a hybrid in-memory/disk-based distributed
computing model that is similar to Hadoop's MapReduce model. Spark maintains a fluent function chaining DSL that is
arguably easier for developers to work with than native Hadoop MapReduce. Spark-Gremlin provides an implementation of
the bulk-synchronous parallel, distributed message passing algorithm within Spark and thus, any `VertexProgram` can be
executed over `SparkGraphComputer`.
Furthermore the `lib/` directory should be distributed across all machines in the SparkServer cluster. For this purpose
TinkerPop provides a helper script, which takes the Spark installation directory and the Spark machines as input:
[source,shell]
bin/hadoop/init-tp-spark.sh /usr/local/spark spark@10.0.0.1 spark@10.0.0.2 spark@10.0.0.3
Once the `lib/` directory is distributed, `SparkGraphComputer` can be used as follows.
[gremlin-groovy]
----
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
g = graph.traversal().withComputer(SparkGraphComputer)
g.V().count()
g.V().out().out().values('name')
----
For using lambdas in Gremlin-Groovy, simply provide `:remote connect` a `TraversalSource` which leverages SparkGraphComputer.
[gremlin-groovy]
----
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
g = graph.traversal().withComputer(SparkGraphComputer)
:remote connect tinkerpop.hadoop graph g
:> g.V().group().by{it.value('name')[1]}.by('name')
----
The `SparkGraphComputer` algorithm leverages Spark's caching abilities to reduce the amount of data shuffled across
the wire on each iteration of the <<vertexprogram,`VertexProgram`>>. When the graph is loaded as a Spark RDD
(Resilient Distributed Dataset) it is immediately cached as `graphRDD`. The `graphRDD` is a distributed adjacency
list which encodes the vertex, its properties, and all its incident edges. On the first iteration, each vertex
(in parallel) is passed through `VertexProgram.execute()`. This yields an output of the vertex's mutated state
(i.e. updated compute keys -- `propertyX`) and its outgoing messages. This `viewOutgoingRDD` is then reduced to
`viewIncomingRDD` where the outgoing messages are sent to their respective vertices. If a `MessageCombiner` exists
for the vertex program, then messages are aggregated locally and globally to ultimately yield one incoming message
for the vertex. This reduce sequence is the "message pass." If the vertex program does not terminate on this
iteration, then the `viewIncomingRDD` is joined with the cached `graphRDD` and the process continues. When there
are no more iterations, there is a final join and the resultant RDD is stripped of its edges and messages. This
`mapReduceRDD` is cached and is processed by each <<mapreduce,`MapReduce`>> job in the
<<graphcomputer,`GraphComputer`>> computation.
image::spark-algorithm.png[width=775]
[width="100%",cols="2,10",options="header"]
|========================================================
|Property |Description
|gremlin.hadoop.graphReader |A class for reading a graph-based RDD (e.g. an `InputRDD` or `InputFormat`).
|gremlin.hadoop.graphWriter |A class for writing a graph-based RDD (e.g. an `OutputRDD` or `OutputFormat`).
|gremlin.spark.graphStorageLevel |What `StorageLevel` to use for the cached graph during job execution (default `MEMORY_ONLY`).
|gremlin.spark.persistContext |Whether to create a new `SparkContext` for every `SparkGraphComputer` or to reuse an existing one.
|gremlin.spark.persistStorageLevel |What `StorageLevel` to use when persisted RDDs via `PersistedOutputRDD` (default `MEMORY_ONLY`).
|========================================================
===== InputRDD and OutputRDD
If the provider/user does not want to use Hadoop `InputFormats`, it is possible to leverage Spark's RDD
constructs directly. An `InputRDD` provides a read method that takes a `SparkContext` and returns a graphRDD. Likewise,
and `OutputRDD` is used for writing a graphRDD.
If the graph system provider uses an `InputRDD`, the RDD should maintain an associated `org.apache.spark.Partitioner`. By doing so,
`SparkGraphComputer` will not partition the loaded graph across the cluster as it has already been partitioned by the graph system provider.
This can save a significant amount of time and space resources. If the `InputRDD` does not have a registered partitioner,
`SparkGraphComputer` will partition the graph using a `org.apache.spark.HashPartitioner` with the number of partitions
being either the number of existing partitions in the input (i.e. input splits) or the user specified number of `GraphComputer.workers()`.
===== Storage Levels
The `SparkGraphComputer` uses `MEMORY_ONLY` to cache the input graph and the output graph by default. Users should be aware of the impact of
different storage levels, since the default settings can quickly lead to memory issues on larger graphs. An overview of Spark's persistence
settings is provided in link:http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence[Spark's programming guide].
===== Using a Persisted Context
It is possible to persist the graph RDD between jobs within the `SparkContext` (e.g. SparkServer) by leveraging `PersistedOutputRDD`.
Note that `gremlin.spark.persistContext` should be set to `true` or else the persisted RDD will be destroyed when the `SparkContext` closes.
The persisted RDD is named by the `gremlin.hadoop.outputLocation` configuration. Similarly, `PersistedInputRDD` is used with respective
`gremlin.hadoop.inputLocation` to retrieve the persisted RDD from the `SparkContext`.
When using a persistent `SparkContext` the configuration used by the original Spark Configuration will be inherited by all threaded
references to that Spark Context. The exception to this rule are those properties which have a specific thread local effect.
.Thread Local Properties
. spark.jobGroup.id
. spark.job.description
. spark.job.interruptOnCancel
. spark.scheduler.pool
Finally, there is a `spark` object that can be used to manage persisted RDDs (see <<interacting-with-spark, Interacting with Spark>>).
[[clonevertexprogramusingspark]]
===== Using CloneVertexProgram
The <<clonevertexprogram, CloneVertexProgram>> copies a whole graph from any graph `InputFormat` to any graph
`OutputFormat`. TinkerPop provides formats such as `GraphSONOutputFormat`, `GryoOutputFormat` or `ScriptOutputFormat`.
The example below takes a Hadoop graph as the input (in `GryoInputFormat`) and exports it as a GraphSON file
(`GraphSONOutputFormat`).
[gremlin-groovy]
----
hdfs.copyFromLocal('data/tinkerpop-modern.kryo', 'tinkerpop-modern.kryo')
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
graph.configuration().setProperty('gremlin.hadoop.graphWriter', 'org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat')
graph.compute(SparkGraphComputer).program(CloneVertexProgram.build().create()).submit().get()
hdfs.ls('output')
hdfs.head('output/~g')
----