blob: b89c0a143b2aeea0e219894584ad5e3ceae64b38 [file] [log] [blame]
////
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
[[hadoop-gremlin]]
Hadoop-Gremlin
--------------
[source,xml]
----
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>hadoop-gremlin</artifactId>
<version>x.y.z</version>
</dependency>
----
image:hadoop-logo-notext.png[width=100,float=left] link:http://hadoop.apache.org/[Hadoop] is a distributed
computing framework that is used to process data represented across a multi-machine compute cluster. When the
data in the Hadoop cluster represents a TinkerPop3 graph, then Hadoop-Gremlin can be used to process the graph
using both TinkerPop3's OLTP and OLAP graph computing models.
IMPORTANT: This section assumes that the user has a Hadoop 2.x cluster functioning. For more information on getting
started with Hadoop, please see the
link:http://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/SingleCluster.html[Single Node Setup]
tutorial. Moreover, if using `GiraphGraphComputer` or `SparkGraphComputer` it is advisable that the reader also
familiarize their self with Giraph (link:http://giraph.apache.org/quick_start.html[Getting Started]) and Spark
(link:http://spark.apache.org/docs/latest/quick-start.html[Quick Start]).
Installing Hadoop-Gremlin
~~~~~~~~~~~~~~~~~~~~~~~~~
If using <<gremlin-console,Gremlin Console>>, it is important to install the Hadoop-Gremlin plugin. Note that
Hadoop-Gremlin requires a Gremlin Console restart after installing.
[source,text]
----
$ bin/gremlin.sh
\,,,/
(o o)
-----oOOo-(3)-oOOo-----
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
plugin activated: tinkerpop.tinkergraph
gremlin> :install org.apache.tinkerpop hadoop-gremlin x.y.z
==>loaded: [org.apache.tinkerpop, hadoop-gremlin, x.y.z] - restart the console to use [tinkerpop.hadoop]
gremlin> :q
$ bin/gremlin.sh
\,,,/
(o o)
-----oOOo-(3)-oOOo-----
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
plugin activated: tinkerpop.tinkergraph
gremlin> :plugin use tinkerpop.hadoop
==>tinkerpop.hadoop activated
gremlin>
----
It is important that the `CLASSPATH` environmental variable references `HADOOP_CONF_DIR` and that the configuration
files in `HADOOP_CONF_DIR` contain references to a live Hadoop cluster. It is easy to verify a proper configuration
from within the Gremlin Console. If `hdfs` references the local file system, then there is a configuration issue.
[source,text]
----
gremlin> hdfs
==>storage[org.apache.hadoop.fs.LocalFileSystem@65bb9029] // BAD
gremlin> hdfs
==>storage[DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_1229457199_1, ugi=user (auth:SIMPLE)]]] // GOOD
----
The `HADOOP_GREMLIN_LIBS` references locations that contains jars that should be uploaded to a respective
distributed cache (link:http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html[YARN] or SparkServer).
Note that the locations in `HADOOP_GREMLIN_LIBS` can be a colon-separated (`:`) and all jars from all locations will
be loaded into the cluster. Typically, only the jars of the respective GraphComputer are required to be loaded (e.g.
`GiraphGraphComputer` plugin lib directory).
[source,shell]
export HADOOP_GREMLIN_LIBS=/usr/local/gremlin-console/ext/giraph-gremlin/lib
Properties Files
~~~~~~~~~~~~~~~~
`HadoopGraph` makes use of properties files which ultimately get turned into Apache configurations and/or
Hadoop configurations. The example properties file presented below is located at `conf/hadoop/hadoop-gryo.properties`.
[source,text]
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.inputLocation=tinkerpop-modern.kryo
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.outputLocation=output
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
gremlin.hadoop.jarsInDistributedCache=true
####################################
# Spark Configuration #
####################################
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
####################################
# SparkGraphComputer Configuration #
####################################
gremlin.spark.graphInputRDD=org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat
gremlin.spark.graphOutputRDD=org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDDFormat
gremlin.spark.persistContext=true
#####################################
# GiraphGraphComputer Configuration #
#####################################
giraph.minWorkers=2
giraph.maxWorkers=2
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapreduce.map.java.opts=-Xmx1024m
mapreduce.reduce.java.opts=-Xmx1024m
giraph.numInputThreads=2
giraph.numComputeThreads=2
A review of the Hadoop-Gremlin specific properties are provided in the table below. For the respective OLAP
engines (<<sparkgraphcomputer,`SparkGraphComputer`>> or <<giraphgraphcomputer,`GiraphGraphComputer`>>) refer
to their respective documentation for configuration options.
[width="100%",cols="2,10",options="header"]
|=========================================================
|Property |Description
|gremlin.graph |The class of the graph to construct using GraphFactory.
|gremlin.hadoop.inputLocation |The location of the input file(s) for Hadoop-Gremlin to read the graph from.
|gremlin.hadoop.graphInputFormat |The format that the graph input file(s) are represented in.
|gremlin.hadoop.outputLocation |The location to write the computed HadoopGraph to.
|gremlin.hadoop.graphOutputFormat |The format that the output file(s) should be represented in.
|gremlin.hadoop.jarsInDistributedCache |Whether to upload the Hadoop-Gremlin jars to a distributed cache (necessary if jars are not on the machines' classpaths).
|=========================================================
Along with the properties above, the numerous link:http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/core-default.xml[Hadoop specific properties]
can be added as needed to tune and parameterize the executed Hadoop-Gremlin job on the respective Hadoop cluster.
IMPORTANT: As the size of the graphs being processed becomes large, it is important to fully understand how the
underlying OLAP engine (e.g. Spark, Giraph, etc.) works and understand the numerous parameterizations offered by
these systems. Such knowledge can help alleviate out of memory exceptions, slow load times, slow processing times,
garbage collection issues, etc.
OLTP Hadoop-Gremlin
~~~~~~~~~~~~~~~~~~~
image:hadoop-pipes.png[width=180,float=left] It is possible to execute OLTP operations over a `HadoopGraph`.
However, realize that the underlying HDFS files are not random access and thus, to retrieve a vertex, a linear scan
is required. OLTP operations are useful for peeking into the graph prior to executing a long running OLAP job -- e.g.
`g.V().valueMap().limit(10)`.
WARNING: OLTP operations on `HadoopGraph` are not efficient. They require linear scans to execute and are unreasonable
for large graphs. In such large graph situations, make use of <<traversalvertexprogram,TraversalVertexProgram>>
which is the OLAP Gremlin machine.
[gremlin-groovy]
----
hdfs.copyFromLocal('data/tinkerpop-modern.kryo', 'tinkerpop-modern.kryo')
hdfs.ls()
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
g = graph.traversal()
g.V().count()
g.V().out().out().values('name')
g.V().group().by{it.value('name')[1]}.by('name').next()
----
OLAP Hadoop-Gremlin
~~~~~~~~~~~~~~~~~~~
image:hadoop-furnace.png[width=180,float=left] Hadoop-Gremlin was designed to execute OLAP operations via
`GraphComputer`. The OLTP examples presented previously are reproduced below, but using `TraversalVertexProgram`
for the execution of the Gremlin traversal.
A `Graph` in TinkerPop3 can support any number of `GraphComputer` implementations. Out of the box, Hadoop-Gremlin
supports the following three implementations.
* <<mapreducegraphcomputer,`MapReduceGraphComputer`>>: Leverages Hadoop's MapReduce engine to execute TinkerPop3 OLAP
computations. (*coming soon*)
** The graph must fit within the total disk space of the Hadoop cluster (supports massive graphs). Message passing is
coordinated via MapReduce jobs over the on-disk graph (slow traversals).
* <<sparkgraphcomputer,`SparkGraphComputer`>>: Leverages Apache Spark to execute TinkerPop3 OLAP computations.
** The graph may fit within the total RAM of the cluster (supports larger graphs). Message passing is coordinated via
Spark map/reduce/join operations on in-memory and disk-cached data (average speed traversals).
* <<giraphgraphcomputer,`GiraphGraphComputer`>>: Leverages Apache Giraph to execute TinkerPop3 OLAP computations.
** The graph should fit within the total RAM of the Hadoop cluster (graph size restriction), though "out-of-core"
processing is possible. Message passing is coordinated via ZooKeeper for the in-memory graph (speedy traversals).
TIP: image:gremlin-sugar.png[width=50,float=left] For those wanting to use the <<sugar-plugin,SugarPlugin>> with
their submitted traversal, do `:remote config useSugar true` as well as `:plugin use tinkerpop.sugar` at the start of
the Gremlin Console session if it is not already activated.
Note that `SparkGraphComputer` and `GiraphGraphComputer` are loaded via their respective plugins. Typically only
one plugin or the other is loaded depending on the desired `GraphComputer` to use.
[source,text]
----
$ bin/gremlin.sh
\,,,/
(o o)
-----oOOo-(3)-oOOo-----
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
plugin activated: tinkerpop.tinkergraph
plugin activated: tinkerpop.hadoop
gremlin> :install org.apache.tinkerpop giraph-gremlin x.y.z
==>loaded: [org.apache.tinkerpop, giraph-gremlin, x.y.z] - restart the console to use [tinkerpop.giraph]
gremlin> :install org.apache.tinkerpop spark-gremlin x.y.z
==>loaded: [org.apache.tinkerpop, spark-gremlin, x.y.z] - restart the console to use [tinkerpop.spark]
gremlin> :q
$ bin/gremlin.sh
\,,,/
(o o)
-----oOOo-(3)-oOOo-----
plugin activated: tinkerpop.server
plugin activated: tinkerpop.utilities
plugin activated: tinkerpop.tinkergraph
plugin activated: tinkerpop.hadoop
gremlin> :plugin use tinkerpop.giraph
==>tinkerpop.giraph activated
gremlin> :plugin use tinkerpop.spark
==>tinkerpop.spark activated
----
WARNING: Hadoop, Spark, and Giraph all depend on many of the same libraries (e.g. ZooKeeper, Snappy, Netty, Guava,
etc.). Unfortunately, typically these dependencies are not to the same versions of the respective libraries. As such,
it is best to *not* have both Spark and Giraph plugins loaded in the same console session nor in the same Java
project (though intelligent `<exclusion>`-usage can help alleviate conflicts in a Java project).
WARNING: It is important to note that when doing an OLAP traversal, any resulting vertices, edges, or properties will be
attached to the source graph. For Hadoop-based graphs, this may lead to linear search times on massive graphs. Thus,
if vertex, edge, or property objects are to be returns (as a final result), it is best to `.id()` to get the id
of the object and not the actual attached object.
[[mapreducegraphcomputer]]
MapReduceGraphComputer
^^^^^^^^^^^^^^^^^^^^^^
*COMING SOON*
[[sparkgraphcomputer]]
SparkGraphComputer
^^^^^^^^^^^^^^^^^^
[source,xml]
----
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>spark-gremlin</artifactId>
<version>x.y.z</version>
</dependency>
----
image:spark-logo.png[width=175,float=left] link:http://spark.apache.org[Spark] is an Apache Software Foundation
project focused on general-purpose OLAP data processing. Spark provides a hybrid in-memory/disk-based distributed
computing model that is similar to Hadoop's MapReduce model. Spark maintains a fluent function chaining DSL that is
arguably easier for developers to work with than native Hadoop MapReduce. Spark-Gremlin provides an implementation of
the bulk-synchronous parallel, distributed message passing algorithm within Spark and thus, any `VertexProgram` can be
executed over `SparkGraphComputer`.
If `SparkGraphComputer` will be used as the `GraphComputer` for `HadoopGraph` then its `lib` directory should be
specified in `HADOOP_GREMLIN_LIBS`.
[source,shell]
export HADOOP_GREMLIN_LIBS=$HADOOP_GREMLIN_LIBS:/usr/local/gremlin-console/ext/spark-gremlin/lib
Furthermore the `lib/` directory should be distributed across all machines in the SparkServer cluster. For this purpose TinkerPop
provides a helper script, which takes the Spark installation directory and the the Spark machines as input:
[source,shell]
bin/hadoop/init-tp-spark.sh /usr/local/spark spark@10.0.0.1 spark@10.0.0.2 spark@10.0.0.3
Once the `lib/` directory is distributed, `SparkGraphComputer` can be used as follows.
[gremlin-groovy]
----
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
g = graph.traversal(computer(SparkGraphComputer))
g.V().count()
g.V().out().out().values('name')
----
For using lambdas in Gremlin-Groovy, simply provide `:remote connect` a `TraversalSource` which leverages SparkGraphComputer.
[gremlin-groovy]
----
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
g = graph.traversal(computer(SparkGraphComputer))
:remote connect tinkerpop.hadoop graph g
:> g.V().group().by{it.value('name')[1]}.by('name')
----
The `SparkGraphComputer` algorithm leverages Spark's caching abilities to reduce the amount of data shuffled across
the wire on each iteration of the <<vertexprogram,`VertexProgram`>>. When the graph is loaded as a Spark RDD
(Resilient Distributed Dataset) it is immediately cached as `graphRDD`. The `graphRDD` is a distributed adjacency
list which encodes the vertex, its properties, and all its incident edges. On the first iteration, each vertex
(in parallel) is passed through `VertexProgram.execute()`. This yields an output of the vertex's mutated state
(i.e. updated compute keys -- `propertyX`) and its outgoing messages. This `viewOutgoingRDD` is then reduced to
`viewIncomingRDD` where the outgoing messages are sent to their respective vertices. If a `MessageCombiner` exists
for the vertex program, then messages are aggregated locally and globally to ultimately yield one incoming message
for the vertex. This reduce sequence is the "message pass." If the vertex program does not terminate on this
iteration, then the `viewIncomingRDD` is joined with the cached `graphRDD` and the process continues. When there
are no more iterations, there is a final join and the resultant RDD is stripped of its edges and messages. This
`mapReduceRDD` is cached and is processed by each <<mapreduce,`MapReduce`>> job in the
<<graphcomputer,`GraphComputer`>> computation.
image::spark-algorithm.png[width=775]
[width="100%",cols="2,10",options="header"]
|========================================================
|Property |Description
|gremlin.spark.graphInputRDD |A class for creating RDD's from underlying graph data, defaults to Hadoop `InputFormat`.
|gremlin.spark.graphOutputRDD |A class for output RDD's, defaults to Hadoop `OutputFormat`.
|gremlin.spark.graphStorageLevel |What `StorageLevel` to use for the cached graph during job execution (default `MEMORY_ONLY`).
|gremlin.spark.persistContext |Whether to create a new `SparkContext` for every `SparkGraphComputer` or to reuse an existing one.
|gremlin.spark.persistStorageLevel |What `StorageLevel` to use when persisted RDDs via `PersistedOutputRDD` (default `MEMORY_ONLY`).
|========================================================
InputRDD and OutputRDD
++++++++++++++++++++++
If the provider/user does not want to use Hadoop `InputFormats`, it is possible to leverage Spark's RDD
constructs directly. There is a `gremlin.spark.graphInputRDD` configuration that references a `Class<? extends
InputRDD>`. An `InputRDD` provides a read method that takes a `SparkContext` and returns a graphRDD. Likewise, use
`gremlin.spark.graphOutputRDD` and the respective `OutputRDD`.
If the graph system provider uses an `InputRDD`, the RDD should maintain an associated `org.apache.spark.Partitioner`. By doing so,
`SparkGraphComputer` will not partition the loaded graph across the cluster as it has already been partitioned by the graph system provider.
This can save a significant amount of time and space resources.
If the `InputRDD` does not have a registered partitioner, `SparkGraphComputer` will partition the graph using
a `org.apache.spark.HashPartitioner` with the number of partitions being either the number of existing partitions in the input (e.g. input splits)
or the user specified number of `GraphComputer.workers()`.
Storage Levels
++++++++++++++
The `SparkGraphComputer` uses `MEMORY_ONLY` to cache the input graph and the output graph by default. Users should be aware of the impact of
different storage levels, since the default settings can quickly lead to memory issues on larger graphs. An overview of Spark's persistence
settings is provided in link:http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence[Spark's programming guide].
Using a Persisted Context
+++++++++++++++++++++++++
It is possible to persist the graph RDD between jobs within the `SparkContext` (e.g. SparkServer) by leveraging `PersistedOutputRDD`.
Note that `gremlin.spark.persistContext` should be set to `true` or else the persisted RDD will be destroyed when the `SparkContext` closes.
The persisted RDD is named by the `gremlin.hadoop.outputLocation` configuration. Similarly, `PersistedInputRDD` is used with respective
`gremlin.hadoop.inputLocation` to retrieve the persisted RDD from the `SparkContext`.
When using a persistent `SparkContext` the configuration used by the original Spark Configuration will be inherited by all threaded
references to that Spark Context. The exception to this rule are those properties which have a specific thread local effect.
.Thread Local Properties
. spark.jobGroup.id
. spark.job.description
. spark.job.interruptOnCancel
. spark.scheduler.pool
Finally, there is a `spark` object that can be used to manage persisted RDDs (see <<interacting-with-spark, Interacting with Spark>>).
[[bulkdumpervertexprogramusingspark]]
Exporting with BulkDumperVertexProgram
++++++++++++++++++++++++++++++++++++++
The <<bulkdumpervertexprogram, BulkDumperVertexProgram>> exports a whole graph in any of the supported Hadoop GraphOutputFormats (`GraphSONOutputFormat`,
`GryoOutputFormat` or `ScriptOutputFormat`). The example below takes a Hadoop graph as the input (in `GryoInputFormat`) and exports it as a GraphSON file
(`GraphSONOutputFormat`).
[gremlin-groovy]
----
hdfs.copyFromLocal('data/tinkerpop-modern.kryo', 'tinkerpop-modern.kryo')
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
graph.configuration().setProperty('gremlin.hadoop.graphOutputFormat', 'org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat')
graph.compute(SparkGraphComputer).program(BulkDumperVertexProgram.build().create()).submit().get()
hdfs.ls('output')
hdfs.head('output/~g')
----
Loading with BulkLoaderVertexProgram
++++++++++++++++++++++++++++++++++++
The <<bulkloadervertexprogram, BulkLoaderVertexProgram>> is a generalized bulk loader that can be used to load large
amounts of data to and from different `Graph` implementations. The following code demonstrates how to load the
Grateful Dead graph from HadoopGraph into TinkerGraph over Spark:
[gremlin-groovy]
----
hdfs.copyFromLocal('data/grateful-dead.kryo', 'grateful-dead.kryo')
readGraph = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
writeGraph = 'conf/tinkergraph-gryo.properties'
blvp = BulkLoaderVertexProgram.build().
keepOriginalIds(false).
writeGraph(writeGraph).create(readGraph)
readGraph.compute(SparkGraphComputer).workers(1).program(blvp).submit().get()
:set max-iteration 10
graph = GraphFactory.open(writeGraph)
g = graph.traversal()
g.V().valueMap()
graph.close()
----
[source,properties]
----
# hadoop-grateful-gryo.properties
#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.inputLocation=grateful-dead.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.jarsInDistributedCache=true
#
# SparkGraphComputer Configuration
#
spark.master=local[1]
spark.executor.memory=1g
spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
----
[source,properties]
----
# tinkergraph-gryo.properties
gremlin.graph=org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph
gremlin.tinkergraph.graphFormat=gryo
gremlin.tinkergraph.graphLocation=/tmp/tinkergraph.kryo
----
IMPORTANT: The path to TinkerGraph jars needs to be included in the `HADOOP_GREMLIN_LIBS` for the above example to work.
[[giraphgraphcomputer]]
GiraphGraphComputer
^^^^^^^^^^^^^^^^^^^
[source,xml]
----
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>giraph-gremlin</artifactId>
<version>x.y.z</version>
</dependency>
----
image:giraph-logo.png[width=100,float=left] link:http://giraph.apache.org[Giraph] is an Apache Software Foundation
project focused on OLAP-based graph processing. Giraph makes use of the distributed graph computing paradigm made
popular by Google's Pregel. In Giraph, developers write "vertex programs" that get executed at each vertex in
parallel. These programs communicate with one another in a bulk synchronous parallel (BSP) manner. This model aligns
with TinkerPop3's `GraphComputer` API. TinkerPop3 provides an implementation of `GraphComputer` that works for Giraph
called `GiraphGraphComputer`. Moreover, with TinkerPop3's <<mapreduce,MapReduce>>-framework, the standard
Giraph/Pregel model is extended to support an arbitrary number of MapReduce phases to aggregate and yield results
from the graph. Below are examples using `GiraphGraphComputer` from the <<gremlin-console,Gremlin-Console>>.
WARNING: Giraph uses a large number of Hadoop counters. The default for Hadoop is 120. In `mapred-site.xml` it is
possible to increase the limit it via the `mapreduce.job.counters.max` property. A good value to use is 1000. This
is a cluster-wide property so be sure to restart the cluster after updating.
WARNING: The maximum number of workers can be no larger than the number of map-slots in the Hadoop cluster minus 1.
For example, if the Hadoop cluster has 4 map slots, then `giraph.maxWorkers` can not be larger than 3. One map-slot
is reserved for the master compute node and all other slots can be allocated as workers to execute the VertexPrograms
on the vertices of the graph.
If `GiraphGraphComputer` will be used as the `GraphComputer` for `HadoopGraph` then its `lib` directory should be
specified in `HADOOP_GREMLIN_LIBS`.
[source,shell]
export HADOOP_GREMLIN_LIBS=$HADOOP_GREMLIN_LIBS:/usr/local/gremlin-console/ext/giraph-gremlin/lib
Or, the user can specify the directory in the Gremlin Console.
[source,groovy]
System.setProperty('HADOOP_GREMLIN_LIBS',System.getProperty('HADOOP_GREMLIN_LIBS') + ':' + '/usr/local/gremlin-console/ext/giraph-gremlin/lib')
[gremlin-groovy]
----
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
g = graph.traversal(computer(GiraphGraphComputer))
g.V().count()
g.V().out().out().values('name')
----
IMPORTANT: The examples above do not use lambdas (i.e. closures in Gremlin-Groovy). This makes the traversal
serializable and thus, able to be distributed to all machines in the Hadoop cluster. If a lambda is required in a
traversal, then the traversal must be sent as a `String` and compiled locally at each machine in the cluster. The
following example demonstrates the `:remote` command which allows for submitting Gremlin traversals as a `String`.
[gremlin-groovy]
----
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
g = graph.traversal(computer(GiraphGraphComputer))
:remote connect tinkerpop.hadoop graph g
:> g.V().group().by{it.value('name')[1]}.by('name')
result
result.memory.runtime
result.memory.keys()
result.memory.get('~reducing')
----
NOTE: If the user explicitly specifies `giraph.maxWorkers` and/or `giraph.numComputeThreads` in the configuration,
then these values will be used by Giraph. However, if these are not specified and the user never calls
`GraphComputer.workers()` then `GiraphGraphComputer` will try to compute the number of workers/threads to use based
on the cluster's profile.
Loading with BulkLoaderVertexProgram
++++++++++++++++++++++++++++++++++++
The <<bulkloadervertexprogram, BulkLoaderVertexProgram>> is a generalized bulk loader that can be used to load
large amounts of data to and from different `Graph` implementations. The following code demonstrates how to load
the Grateful Dead graph from HadoopGraph into TinkerGraph over Giraph:
[gremlin-groovy]
----
hdfs.copyFromLocal('data/grateful-dead.kryo', 'grateful-dead.kryo')
readGraph = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
writeGraph = 'conf/tinkergraph-gryo.properties'
blvp = BulkLoaderVertexProgram.build().
keepOriginalIds(false).
writeGraph(writeGraph).create(readGraph)
readGraph.compute(GiraphGraphComputer).workers(1).program(blvp).submit().get()
:set max-iteration 10
graph = GraphFactory.open(writeGraph)
g = graph.traversal()
g.V().valueMap()
graph.close()
----
[source,properties]
----
# hadoop-grateful-gryo.properties
#
# Hadoop Graph Configuration
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
gremlin.hadoop.inputLocation=grateful-dead.kryo
gremlin.hadoop.outputLocation=output
gremlin.hadoop.jarsInDistributedCache=true
#
# GiraphGraphComputer Configuration
#
giraph.minWorkers=1
giraph.maxWorkers=1
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000
----
[source,properties]
----
# tinkergraph-gryo.properties
gremlin.graph=org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph
gremlin.tinkergraph.graphFormat=gryo
gremlin.tinkergraph.graphLocation=/tmp/tinkergraph.kryo
----
NOTE: The path to TinkerGraph needs to be included in the `HADOOP_GREMLIN_LIBS` for the above example to work.
Input/Output Formats
~~~~~~~~~~~~~~~~~~~~
image:adjacency-list.png[width=300,float=right] Hadoop-Gremlin provides various I/O formats -- i.e. Hadoop
`InputFormat` and `OutputFormat`. All of the formats make use of an link:http://en.wikipedia.org/wiki/Adjacency_list[adjacency list]
representation of the graph where each "row" represents a single vertex, its properties, and its incoming and
outgoing edges.
{empty} +
[[gryo-io-format]]
Gryo I/O Format
^^^^^^^^^^^^^^^
* **InputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat`
* **OutputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat`
<<gryo-reader-writer,Gryo>> is a binary graph format that leverages link:https://github.com/EsotericSoftware/kryo[Kryo]
to make a compact, binary representation of a vertex. It is recommended that users leverage Gryo given its space/time
savings over text-based representations.
NOTE: The `GryoInputFormat` is splittable.
[[graphson-io-format]]
GraphSON I/O Format
^^^^^^^^^^^^^^^^^^^
* **InputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat`
* **OutputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat`
<<graphson-reader-writer,GraphSON>> is a JSON based graph format. GraphSON is a space-expensive graph format in that
it is a text-based markup language. However, it is convenient for many developers to work with as its structure is
simple (easy to create and parse).
The data below represents an adjacency list representation of the classic TinkerGraph toy graph in GraphSON format.
[source,json]
----
{"id":1,"label":"person","outE":{"created":[{"id":9,"inV":3,"properties":{"weight":0.4}}],"knows":[{"id":7,"inV":2,"properties":{"weight":0.5}},{"id":8,"inV":4,"properties":{"weight":1.0}}]},"properties":{"name":[{"id":0,"value":"marko"}],"age":[{"id":1,"value":29}]}}
{"id":2,"label":"person","inE":{"knows":[{"id":7,"outV":1,"properties":{"weight":0.5}}]},"properties":{"name":[{"id":2,"value":"vadas"}],"age":[{"id":3,"value":27}]}}
{"id":3,"label":"software","inE":{"created":[{"id":9,"outV":1,"properties":{"weight":0.4}},{"id":11,"outV":4,"properties":{"weight":0.4}},{"id":12,"outV":6,"properties":{"weight":0.2}}]},"properties":{"name":[{"id":4,"value":"lop"}],"lang":[{"id":5,"value":"java"}]}}
{"id":4,"label":"person","inE":{"knows":[{"id":8,"outV":1,"properties":{"weight":1.0}}]},"outE":{"created":[{"id":10,"inV":5,"properties":{"weight":1.0}},{"id":11,"inV":3,"properties":{"weight":0.4}}]},"properties":{"name":[{"id":6,"value":"josh"}],"age":[{"id":7,"value":32}]}}
{"id":5,"label":"software","inE":{"created":[{"id":10,"outV":4,"properties":{"weight":1.0}}]},"properties":{"name":[{"id":8,"value":"ripple"}],"lang":[{"id":9,"value":"java"}]}}
{"id":6,"label":"person","outE":{"created":[{"id":12,"inV":3,"properties":{"weight":0.2}}]},"properties":{"name":[{"id":10,"value":"peter"}],"age":[{"id":11,"value":35}]}}
----
[[script-io-format]]
Script I/O Format
^^^^^^^^^^^^^^^^^
* **InputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptInputFormat`
* **OutputFormat**: `org.apache.tinkerpop.gremlin.hadoop.structure.io.script.ScriptOutputFormat`
`ScriptInputFormat` and `ScriptOutputFormat` take an arbitrary script and use that script to either read or write
`Vertex` objects, respectively. This can be considered the most general `InputFormat`/`OutputFormat` possible in that
Hadoop-Gremlin uses the user provided script for all reading/writing.
ScriptInputFormat
+++++++++++++++++
The data below represents an adjacency list representation of the classic TinkerGraph toy graph. First line reads,
"vertex `1`, labeled `person` having 2 property values (`marko` and `29`) has 3 outgoing edges; the first edge is
labeled `knows`, connects the current vertex `1` with vertex `2` and has a property value `0.4`, and so on."
[source]
1:person:marko:29 knows:2:0.5,knows:4:1.0,created:3:0.4
2:person:vadas:27
3:project:lop:java
4:person:josh:32 created:3:0.4,created:5:1.0
5:project:ripple:java
6:person:peter:35 created:3:0.2
There is no corresponding `InputFormat` that can parse this particular file (or some adjacency list variant of it).
As such, `ScriptInputFormat` can be used. With `ScriptInputFormat` a script is stored in HDFS and leveraged by each
mapper in the Hadoop job. The script must have the following method defined:
[source,groovy]
def parse(String line, ScriptElementFactory factory) { ... }
`ScriptElementFactory` is a legacy from previous versions and, although it's still functional, it should no longer be used.
In order to create vertices and edges, the `parse()` method gets access to a global variable named `graph`, which holds
the local `StarGraph` for the current line/vertex.
An appropriate `parse()` for the above adjacency list file is:
[source,groovy]
def parse(line, factory) {
def parts = line.split(/ /)
def (id, label, name, x) = parts[0].split(/:/).toList()
def v1 = graph.addVertex(T.id, id, T.label, label)
if (name != null) v1.property('name', name) // first value is always the name
if (x != null) {
// second value depends on the vertex label; it's either
// the age of a person or the language of a project
if (label.equals('project')) v1.property('lang', x)
else v1.property('age', Integer.valueOf(x))
}
if (parts.length == 2) {
parts[1].split(/,/).grep { !it.isEmpty() }.each {
def (eLabel, refId, weight) = it.split(/:/).toList()
def v2 = graph.addVertex(T.id, refId)
v1.addOutEdge(eLabel, v2, 'weight', Double.valueOf(weight))
}
}
return v1
}
The resultant `Vertex` denotes whether the line parsed yielded a valid Vertex. As such, if the line is not valid
(e.g. a comment line, a skip line, etc.), then simply return `null`.
ScriptOutputFormat Support
++++++++++++++++++++++++++
The principle above can also be used to convert a vertex to an arbitrary `String` representation that is ultimately
streamed back to a file in HDFS. This is the role of `ScriptOutputFormat`. `ScriptOutputFormat` requires that the
provided script maintains a method with the following signature:
[source,groovy]
def stringify(Vertex vertex) { ... }
An appropriate `stringify()` to produce output in the same format that was shown in the `ScriptInputFormat` sample is:
[source,groovy]
def stringify(vertex) {
def v = vertex.values('name', 'age', 'lang').inject(vertex.id(), vertex.label()).join(':')
def outE = vertex.outE().map {
def e = it.get()
e.values('weight').inject(e.label(), e.inV().next().id()).join(':')
}.join(',')
return [v, outE].join('\t')
}
Storage Systems
~~~~~~~~~~~~~~~
Hadoop-Gremlin provides two implementations of the `Storage` API:
* `FileSystemStorage`: Access HDFS and local file system data.
* `SparkContextStorage`: Access Spark persisted RDD data.
[[interacting-with-hdfs]]
Interacting with HDFS
^^^^^^^^^^^^^^^^^^^^^
The distributed file system of Hadoop is called link:http://en.wikipedia.org/wiki/Apache_Hadoop#Hadoop_distributed_file_system[HDFS].
The results of any OLAP operation are stored in HDFS accessible via `hdfs`. For local file system access, there is `local`.
[gremlin-groovy]
----
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey('clusterCount').create()).submit().get();
hdfs.ls()
hdfs.ls('output')
hdfs.head('output', GryoInputFormat)
hdfs.head('output', 'clusterCount', SequenceFileInputFormat)
hdfs.rm('output')
hdfs.ls()
----
[[interacting-with-spark]]
Interacting with Spark
^^^^^^^^^^^^^^^^^^^^^^
If a Spark context is persisted, then Spark RDDs will remain the Spark cache and accessible over subsequent jobs.
RDDs are retrieved and saved to the `SparkContext` via `PersistedInputRDD` and `PersistedOutputRDD` respectivly.
Persisted RDDs can be accessed using `spark`.
[gremlin-groovy]
----
Spark.create('local[4]')
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
graph.configuration().setProperty('gremlin.spark.graphOutputRDD', PersistedOutputRDD.class.getCanonicalName())
graph.configuration().clearProperty('gremlin.hadoop.graphOutputFormat')
graph.configuration().setProperty('gremlin.spark.persistContext',true)
graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey('clusterCount').create()).submit().get();
spark.ls()
spark.ls('output')
spark.head('output', PersistedInputRDD)
spark.head('output', 'clusterCount', PersistedInputRDD)
spark.rm('output')
spark.ls()
----
A Command Line Example
~~~~~~~~~~~~~~~~~~~~~~
image::pagerank-logo.png[width=300]
The classic link:http://en.wikipedia.org/wiki/PageRank[PageRank] centrality algorithm can be executed over the
TinkerPop graph from the command line using `GiraphGraphComputer`.
WARNING: Be sure that the `HADOOP_GREMLIN_LIBS` references the location `lib` directory of the respective
`GraphComputer` engine being used or else the requisite dependencies will not be uploaded to the Hadoop cluster.
[source,text]
----
$ hdfs dfs -copyFromLocal data/tinkerpop-modern.json tinkerpop-modern.json
$ hdfs dfs -ls
Found 2 items
-rw-r--r-- 1 marko supergroup 2356 2014-07-28 13:00 /user/marko/tinkerpop-modern.json
$ hadoop jar target/giraph-gremlin-x.y.z-job.jar org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphGraphComputer ../hadoop-gremlin/conf/hadoop-graphson.properties
15/09/11 08:02:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
15/09/11 08:02:11 INFO computer.GiraphGraphComputer: HadoopGremlin(Giraph): PageRankVertexProgram[alpha=0.85,iterations=30]
15/09/11 08:02:12 INFO mapreduce.JobSubmitter: number of splits:3
15/09/11 08:02:12 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1441915907347_0028
15/09/11 08:02:12 INFO impl.YarnClientImpl: Submitted application application_1441915907347_0028
15/09/11 08:02:12 INFO job.GiraphJob: Tracking URL: http://markos-macbook:8088/proxy/application_1441915907347_0028/
15/09/11 08:02:12 INFO job.GiraphJob: Waiting for resources... Job will start only when it gets all 3 mappers
15/09/11 08:03:54 INFO mapreduce.Job: Running job: job_1441915907347_0028
15/09/11 08:03:55 INFO mapreduce.Job: Job job_1441915907347_0028 running in uber mode : false
15/09/11 08:03:55 INFO mapreduce.Job: map 33% reduce 0%
15/09/11 08:03:57 INFO mapreduce.Job: map 67% reduce 0%
15/09/11 08:04:01 INFO mapreduce.Job: map 100% reduce 0%
15/09/11 08:06:17 INFO mapreduce.Job: Job job_1441915907347_0028 completed successfully
15/09/11 08:06:17 INFO mapreduce.Job: Counters: 80
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=483918
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=1465
HDFS: Number of bytes written=1760
HDFS: Number of read operations=39
HDFS: Number of large read operations=0
HDFS: Number of write operations=20
Job Counters
Launched map tasks=3
Other local map tasks=3
Total time spent by all maps in occupied slots (ms)=458105
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=458105
Total vcore-seconds taken by all map tasks=458105
Total megabyte-seconds taken by all map tasks=469099520
Map-Reduce Framework
Map input records=3
Map output records=0
Input split bytes=132
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=1594
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
Total committed heap usage (bytes)=527958016
Giraph Stats
Aggregate edges=0
Aggregate finished vertices=0
Aggregate sent message message bytes=13535
Aggregate sent messages=186
Aggregate vertices=6
Current master task partition=0
Current workers=2
Last checkpointed superstep=0
Sent message bytes=438
Sent messages=6
Superstep=31
Giraph Timers
Initialize (ms)=2996
Input superstep (ms)=5209
Setup (ms)=59
Shutdown (ms)=9324
Superstep 0 GiraphComputation (ms)=3861
Superstep 1 GiraphComputation (ms)=4027
Superstep 10 GiraphComputation (ms)=4000
Superstep 11 GiraphComputation (ms)=4004
Superstep 12 GiraphComputation (ms)=3999
Superstep 13 GiraphComputation (ms)=4000
Superstep 14 GiraphComputation (ms)=4005
Superstep 15 GiraphComputation (ms)=4003
Superstep 16 GiraphComputation (ms)=4001
Superstep 17 GiraphComputation (ms)=4007
Superstep 18 GiraphComputation (ms)=3998
Superstep 19 GiraphComputation (ms)=4006
Superstep 2 GiraphComputation (ms)=4007
Superstep 20 GiraphComputation (ms)=3996
Superstep 21 GiraphComputation (ms)=4006
Superstep 22 GiraphComputation (ms)=4002
Superstep 23 GiraphComputation (ms)=3998
Superstep 24 GiraphComputation (ms)=4003
Superstep 25 GiraphComputation (ms)=4001
Superstep 26 GiraphComputation (ms)=4003
Superstep 27 GiraphComputation (ms)=4005
Superstep 28 GiraphComputation (ms)=4002
Superstep 29 GiraphComputation (ms)=4001
Superstep 3 GiraphComputation (ms)=3988
Superstep 30 GiraphComputation (ms)=4248
Superstep 4 GiraphComputation (ms)=4010
Superstep 5 GiraphComputation (ms)=3998
Superstep 6 GiraphComputation (ms)=3996
Superstep 7 GiraphComputation (ms)=4005
Superstep 8 GiraphComputation (ms)=4009
Superstep 9 GiraphComputation (ms)=3994
Total (ms)=138788
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=0
$ hdfs dfs -cat output/~g/*
{"id":1,"label":"person","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":39,"value":0.15000000000000002}],"name":[{"id":0,"value":"marko"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":10,"value":3.0}],"age":[{"id":1,"value":29}]}}
{"id":5,"label":"software","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":35,"value":0.23181250000000003}],"name":[{"id":8,"value":"ripple"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":6,"value":0.0}],"lang":[{"id":9,"value":"java"}]}}
{"id":3,"label":"software","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":39,"value":0.4018125}],"name":[{"id":4,"value":"lop"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":10,"value":0.0}],"lang":[{"id":5,"value":"java"}]}}
{"id":4,"label":"person","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":39,"value":0.19250000000000003}],"name":[{"id":6,"value":"josh"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":10,"value":2.0}],"age":[{"id":7,"value":32}]}}
{"id":2,"label":"person","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":35,"value":0.19250000000000003}],"name":[{"id":2,"value":"vadas"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":6,"value":0.0}],"age":[{"id":3,"value":27}]}}
{"id":6,"label":"person","properties":{"gremlin.pageRankVertexProgram.pageRank":[{"id":35,"value":0.15000000000000002}],"name":[{"id":10,"value":"peter"}],"gremlin.pageRankVertexProgram.edgeCount":[{"id":6,"value":1.0}],"age":[{"id":11,"value":35}]}}
----
Vertex 4 ("josh") is isolated below:
[source,js]
----
{
"id":4,
"label":"person",
"properties": {
"gremlin.pageRankVertexProgram.pageRank":[{"id":39,"value":0.19250000000000003}],
"name":[{"id":6,"value":"josh"}],
"gremlin.pageRankVertexProgram.edgeCount":[{"id":10,"value":2.0}],
"age":[{"id":7,"value":32}]}
}
}
----