| //// |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| //// |
| [[sparkgraphcomputer]] |
| ==== SparkGraphComputer |
| |
| [source,xml] |
| ---- |
| <dependency> |
| <groupId>org.apache.tinkerpop</groupId> |
| <artifactId>spark-gremlin</artifactId> |
| <version>x.y.z</version> |
| </dependency> |
| ---- |
| |
| image:spark-logo.png[width=175,float=left] link:http://spark.apache.org[Spark] is an Apache Software Foundation |
| project focused on general-purpose OLAP data processing. Spark provides a hybrid in-memory/disk-based distributed |
| computing model that is similar to Hadoop's MapReduce model. Spark maintains a fluent function chaining DSL that is |
| arguably easier for developers to work with than native Hadoop MapReduce. Spark-Gremlin provides an implementation of |
| the bulk-synchronous parallel, distributed message passing algorithm within Spark and thus, any `VertexProgram` can be |
| executed over `SparkGraphComputer`. |
| |
| Furthermore the `lib/` directory should be distributed across all machines in the SparkServer cluster. For this purpose |
| TinkerPop provides a helper script, which takes the Spark installation directory and the Spark machines as input: |
| |
| [source,shell] |
| bin/hadoop/init-tp-spark.sh /usr/local/spark spark@10.0.0.1 spark@10.0.0.2 spark@10.0.0.3 |
| |
| Once the `lib/` directory is distributed, `SparkGraphComputer` can be used as follows. |
| |
| [gremlin-groovy] |
| ---- |
| graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties') |
| g = graph.traversal().withComputer(SparkGraphComputer) |
| g.V().count() |
| g.V().out().out().values('name') |
| ---- |
| |
| For using lambdas in Gremlin-Groovy, simply provide `:remote connect` a `TraversalSource` which leverages SparkGraphComputer. |
| |
| [gremlin-groovy] |
| ---- |
| graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties') |
| g = graph.traversal().withComputer(SparkGraphComputer) |
| :remote connect tinkerpop.hadoop graph g |
| :> g.V().group().by{it.value('name')[1]}.by('name') |
| ---- |
| |
| The `SparkGraphComputer` algorithm leverages Spark's caching abilities to reduce the amount of data shuffled across |
| the wire on each iteration of the <<vertexprogram,`VertexProgram`>>. When the graph is loaded as a Spark RDD |
| (Resilient Distributed Dataset) it is immediately cached as `graphRDD`. The `graphRDD` is a distributed adjacency |
| list which encodes the vertex, its properties, and all its incident edges. On the first iteration, each vertex |
| (in parallel) is passed through `VertexProgram.execute()`. This yields an output of the vertex's mutated state |
| (i.e. updated compute keys -- `propertyX`) and its outgoing messages. This `viewOutgoingRDD` is then reduced to |
| `viewIncomingRDD` where the outgoing messages are sent to their respective vertices. If a `MessageCombiner` exists |
| for the vertex program, then messages are aggregated locally and globally to ultimately yield one incoming message |
| for the vertex. This reduce sequence is the "message pass." If the vertex program does not terminate on this |
| iteration, then the `viewIncomingRDD` is joined with the cached `graphRDD` and the process continues. When there |
| are no more iterations, there is a final join and the resultant RDD is stripped of its edges and messages. This |
| `mapReduceRDD` is cached and is processed by each <<mapreduce,`MapReduce`>> job in the |
| <<graphcomputer,`GraphComputer`>> computation. |
| |
| image::spark-algorithm.png[width=775] |
| |
| [width="100%",cols="2,10",options="header"] |
| |======================================================== |
| |Property |Description |
| |gremlin.hadoop.graphReader |A class for reading a graph-based RDD (e.g. an `InputRDD` or `InputFormat`). |
| |gremlin.hadoop.graphWriter |A class for writing a graph-based RDD (e.g. an `OutputRDD` or `OutputFormat`). |
| |gremlin.spark.graphStorageLevel |What `StorageLevel` to use for the cached graph during job execution (default `MEMORY_ONLY`). |
| |gremlin.spark.persistContext |Whether to create a new `SparkContext` for every `SparkGraphComputer` or to reuse an existing one. |
| |gremlin.spark.persistStorageLevel |What `StorageLevel` to use when persisted RDDs via `PersistedOutputRDD` (default `MEMORY_ONLY`). |
| |======================================================== |
| |
| ===== InputRDD and OutputRDD |
| |
| If the provider/user does not want to use Hadoop `InputFormats`, it is possible to leverage Spark's RDD |
| constructs directly. An `InputRDD` provides a read method that takes a `SparkContext` and returns a graphRDD. Likewise, |
| and `OutputRDD` is used for writing a graphRDD. |
| |
| If the graph system provider uses an `InputRDD`, the RDD should maintain an associated `org.apache.spark.Partitioner`. By doing so, |
| `SparkGraphComputer` will not partition the loaded graph across the cluster as it has already been partitioned by the graph system provider. |
| This can save a significant amount of time and space resources. If the `InputRDD` does not have a registered partitioner, |
| `SparkGraphComputer` will partition the graph using a `org.apache.spark.HashPartitioner` with the number of partitions |
| being either the number of existing partitions in the input (i.e. input splits) or the user specified number of `GraphComputer.workers()`. |
| |
| ===== Storage Levels |
| |
| The `SparkGraphComputer` uses `MEMORY_ONLY` to cache the input graph and the output graph by default. Users should be aware of the impact of |
| different storage levels, since the default settings can quickly lead to memory issues on larger graphs. An overview of Spark's persistence |
| settings is provided in link:http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence[Spark's programming guide]. |
| |
| |
| ===== Using a Persisted Context |
| |
| It is possible to persist the graph RDD between jobs within the `SparkContext` (e.g. SparkServer) by leveraging `PersistedOutputRDD`. |
| Note that `gremlin.spark.persistContext` should be set to `true` or else the persisted RDD will be destroyed when the `SparkContext` closes. |
| The persisted RDD is named by the `gremlin.hadoop.outputLocation` configuration. Similarly, `PersistedInputRDD` is used with respective |
| `gremlin.hadoop.inputLocation` to retrieve the persisted RDD from the `SparkContext`. |
| |
| When using a persistent `SparkContext` the configuration used by the original Spark Configuration will be inherited by all threaded |
| references to that Spark Context. The exception to this rule are those properties which have a specific thread local effect. |
| |
| .Thread Local Properties |
| . spark.jobGroup.id |
| . spark.job.description |
| . spark.job.interruptOnCancel |
| . spark.scheduler.pool |
| |
| Finally, there is a `spark` object that can be used to manage persisted RDDs (see <<interacting-with-spark, Interacting with Spark>>). |
| |
| [[clonevertexprogramusingspark]] |
| ===== Using CloneVertexProgram |
| |
| The <<clonevertexprogram, CloneVertexProgram>> copies a whole graph from any graph `InputFormat` to any graph |
| `OutputFormat`. TinkerPop provides formats such as `GraphSONOutputFormat`, `GryoOutputFormat` or `ScriptOutputFormat`. |
| The example below takes a Hadoop graph as the input (in `GryoInputFormat`) and exports it as a GraphSON file |
| (`GraphSONOutputFormat`). |
| |
| [gremlin-groovy] |
| ---- |
| hdfs.copyFromLocal('data/tinkerpop-modern.kryo', 'tinkerpop-modern.kryo') |
| graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties') |
| graph.configuration().setProperty('gremlin.hadoop.graphWriter', 'org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat') |
| graph.compute(SparkGraphComputer).program(CloneVertexProgram.build().create()).submit().get() |
| hdfs.ls('output') |
| hdfs.head('output/~g') |
| ---- |