blob: f9b4dd52bccb8cea30ed6778866e082297bb0e4a [file] [log] [blame]
////
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
////
[[graphcomputer]]
= The GraphComputer
image:graphcomputer-puffers.png[width=350,float=right] TinkerPop provides two primary means of interacting with a
graph: link:http://en.wikipedia.org/wiki/Online_transaction_processing[online transaction processing] (OLTP) and
link:http://en.wikipedia.org/wiki/Online_analytical_processing[online analytical processing] (OLAP). OLTP-based
graph systems allow the user to query the graph in real-time. However, typically, real-time performance is only
possible when a local traversal is enacted. A local traversal is one that starts at a particular vertex (or small set
of vertices) and touches a small set of connected vertices (by any arbitrary path of arbitrary length). In short, OLTP
queries interact with a limited set of data and respond on the order of milliseconds or seconds. On the other hand,
with OLAP graph processing, the entire graph is processed and thus, every vertex and edge is analyzed (some times
more than once for iterative, recursive algorithms). Due to the amount of data being processed, the results are
typically not returned in real-time and for massive graphs (i.e. graphs represented across a cluster of machines),
results can take on the order of minutes or hours.
* *OLTP*: real-time, limited data accessed, random data access, sequential processing, querying
* *OLAP*: long running, entire data set accessed, sequential data access, parallel processing, batch processing
image::oltp-vs-olap.png[width=600]
The image above demonstrates the difference between Gremlin OLTP and Gremlin OLAP. With Gremlin OLTP, the graph is
walked by moving from vertex-to-vertex via incident edges. With Gremlin OLAP, all vertices are provided a
`VertexProgram`. The programs send messages to one another with the topological structure of the graph acting as the
communication network (though random message passing possible). In many respects, the messages passed are like
the OLTP traversers moving from vertex-to-vertex. However, all messages are moving independent of one another, in
parallel. Once a vertex program is finished computing, TinkerPop's OLAP engine supports any number
link:http://en.wikipedia.org/wiki/MapReduce[`MapReduce`] jobs over the resultant graph.
IMPORTANT: `GraphComputer` was designed from the start to be used within a multi-JVM, distributed environment --
in other words, a multi-machine compute cluster. As such, all the computing objects must be able to be migrated
between JVMs. The pattern promoted is to store state information in a `Configuration` object to later be regenerated
by a loading process. It is important to realize that `VertexProgram`, `MapReduce`, and numerous particular instances
rely heavily on the state of the computing classes (not the structure, but the processes) to be stored in a
`Configuration`.
[[vertexprogram]]
== VertexProgram
image:bsp-diagram.png[width=400,float=right] GraphComputer takes a `VertexProgram`. A VertexProgram can be thought of
as a piece of code that is executed at each vertex in logically parallel manner until some termination condition is
met (e.g. a number of iterations have occurred, no more data is changing in the graph, etc.). A submitted
`VertexProgram` is copied to all the workers in the graph. A worker is not an explicit concept in the API, but is
assumed of all `GraphComputer` implementations. At minimum each vertex is a worker (though this would be inefficient
due to the fact that each vertex would maintain a VertexProgram). In practice, the workers partition the vertex set
and are responsible for the execution of the VertexProgram over all the vertices within their sphere of influence.
The workers orchestrate the execution of the `VertexProgram.execute()` method on all their vertices in an
link:http://en.wikipedia.org/wiki/Bulk_synchronous_parallel[bulk synchronous parallel] (BSP) fashion. The vertices
are able to communicate with one another via messages. There are two kinds of messages in Gremlin OLAP:
`MessageScope.Local` and `MessageScope.Global`. A local message is a message to an adjacent vertex. A global
message is a message to any arbitrary vertex in the graph. Once the VertexProgram has completed its execution,
any number of `MapReduce` jobs are evaluated. MapReduce jobs are provided by the user via `GraphComputer.mapReduce()`
or by the `VertexProgram` via `VertexProgram.getMapReducers()`.
image::graphcomputer.png[width=500]
The example below demonstrates how to submit a VertexProgram to a graph's GraphComputer. `GraphComputer.submit()`
yields a `Future<ComputerResult>`. The `ComputerResult` has the resultant computed graph which can be a full copy
of the original graph (see <<hadoop-gremlin,Hadoop-Gremlin>>) or a view over the original graph (see
<<tinkergraph-gremlin,TinkerGraph>>). The ComputerResult also provides access to computational side-effects called `Memory`
(which includes, for example, runtime, number of iterations, results of MapReduce jobs, and VertexProgram-specific
memory manipulations).
[gremlin-groovy,modern]
----
result = graph.compute().program(PageRankVertexProgram.build().create()).submit().get()
result.memory().runtime
g = result.graph().traversal()
g.V().elementMap()
----
NOTE: This model of "vertex-centric graph computing" was made popular by Google's
link:http://googleresearch.blogspot.com/2009/06/large-scale-graph-computing-at-google.html[Pregel] graph engine.
In the open source world, this model is found in OLAP graph computing systems such as link:https://giraph.apache.org/[Giraph],
link:https://hama.apache.org/[Hama]. TinkerPop extends the
popularized model with integrated post-processing <<mapreduce,MapReduce>> jobs over the vertex set.
[[mapreduce]]
== MapReduce
The BSP model proposed by Pregel stores the results of the computation in a distributed manner as properties on the
elements in the graph. In many situations, it is necessary to aggregate those resultant properties into a single
result set (i.e. a statistic). For instance, assume a VertexProgram that computes a nominal cluster for each vertex
(i.e. link:http://en.wikipedia.org/wiki/Community_structure[a graph clustering algorithm]). At the end of the
computation, each vertex will have a property denoting the cluster it was assigned to. TinkerPop provides the
ability to answer global questions about the clusters. For instance, in order to answer the following questions,
`MapReduce` jobs are required:
* How many vertices are in each cluster? (*presented below*)
* How many unique clusters are there? (*presented below*)
* What is the average age of each vertex in each cluster?
* What is the degree distribution of the vertices in each cluster?
A compressed representation of the `MapReduce` API in TinkerPop is provided below. The key idea is that the
`map`-stage processes all vertices to emit key/value pairs. Those values are aggregated on their respective key
for the `reduce`-stage to do its processing to ultimately yield more key/value pairs.
[source,java]
public interface MapReduce<MK, MV, RK, RV, R> {
public void map(final Vertex vertex, final MapEmitter<MK, MV> emitter);
public void reduce(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter);
// there are more methods
}
IMPORTANT: The vertex that is passed into the `MapReduce.map()` method does not contain edges. The vertex only
contains original and computed vertex properties. This reduces the amount of data required to be loaded and ensures
that MapReduce is used for post-processing computed results. All edge-based computing should be accomplished in the
`VertexProgram`.
image:mapreduce.png[width=650]
The `MapReduce` extension to GraphComputer is made explicit when examining the
<<peerpressurevertexprogram,`PeerPressureVertexProgram`>> and corresponding `ClusterPopulationMapReduce`.
In the code below, the GraphComputer result returns the computed on `Graph` as well as the `Memory` of the
computation (`ComputerResult`). The memory maintain the results of any MapReduce jobs. The cluster population
MapReduce result states that there are 5 vertices in cluster 1 and 1 vertex in cluster 6. This can be verified
(in a serial manner) by looking at the `PeerPressureVertexProgram.CLUSTER` property of the resultant graph. Notice
that the property is "hidden" unless it is directly accessed via name.
[gremlin-groovy,modern]
----
graph = TinkerFactory.createModern()
result = graph.compute().program(PeerPressureVertexProgram.build().create()).mapReduce(ClusterPopulationMapReduce.build().create()).submit().get()
result.memory().get('clusterPopulation')
g = result.graph().traversal()
g.V().values(PeerPressureVertexProgram.CLUSTER).groupCount().next()
g.V().elementMap()
----
If there are numerous statistics desired, then its possible to register as many MapReduce jobs as needed. For
instance, the `ClusterCountMapReduce` determines how many unique clusters were created by the peer pressure algorithm.
Below both `ClusterCountMapReduce` and `ClusterPopulationMapReduce` are computed over the resultant graph.
[gremlin-groovy,modern]
----
result = graph.compute().program(PeerPressureVertexProgram.build().create()).
mapReduce(ClusterPopulationMapReduce.build().create()).
mapReduce(ClusterCountMapReduce.build().create()).submit().get()
result.memory().clusterPopulation
result.memory().clusterCount
----
IMPORTANT: The MapReduce model of TinkerPop does not support MapReduce chaining. Thus, the order in which the
MapReduce jobs are executed is irrelevant. This is made apparent when realizing that the `map()`-stage takes a
`Vertex` as its input and the `reduce()`-stage yields key/value pairs. Thus, the results of reduce can not fed back
into a `map()`.
== A Collection of VertexPrograms
TinkerPop provides a collection of VertexPrograms that implement common algorithms. This section discusses the various
implementations.
IMPORTANT: The vertex programs presented are what are provided as of TinkerPop x.y.z. Over time, with future releases,
more algorithms will be added.
[[pagerankvertexprogram]]
=== PageRankVertexProgram
image:gremlin-pagerank.png[width=400,float=right] link:http://en.wikipedia.org/wiki/PageRank[PageRank] is perhaps the
most popular OLAP-oriented graph algorithm. This link:http://en.wikipedia.org/wiki/Centrality[eigenvector centrality]
variant was developed by Brin and Page of Google. PageRank defines a centrality value for all vertices in the graph,
where centrality is defined recursively where a vertex is central if it is connected to central vertices. PageRank is
an iterative algorithm that converges to a link:http://en.wikipedia.org/wiki/Ergodicity[steady state distribution]. If
the pageRank values are normalized to 1.0, then the pageRank value of a vertex is the probability that a random walker
will be seen that that vertex in the graph at any arbitrary moment in time. In order to help developers understand the
methods of a `VertexProgram`, the PageRankVertexProgram code is analyzed below.
[source,java]
----
public class PageRankVertexProgram implements VertexProgram<Double> { <1>
public static final String PAGE_RANK = "gremlin.pageRankVertexProgram.pageRank";
private static final String EDGE_COUNT = "gremlin.pageRankVertexProgram.edgeCount";
private static final String PROPERTY = "gremlin.pageRankVertexProgram.property";
private static final String VERTEX_COUNT = "gremlin.pageRankVertexProgram.vertexCount";
private static final String ALPHA = "gremlin.pageRankVertexProgram.alpha";
private static final String EPSILON = "gremlin.pageRankVertexProgram.epsilon";
private static final String MAX_ITERATIONS = "gremlin.pageRankVertexProgram.maxIterations";
private static final String EDGE_TRAVERSAL = "gremlin.pageRankVertexProgram.edgeTraversal";
private static final String INITIAL_RANK_TRAVERSAL = "gremlin.pageRankVertexProgram.initialRankTraversal";
private static final String TELEPORTATION_ENERGY = "gremlin.pageRankVertexProgram.teleportationEnergy";
private static final String CONVERGENCE_ERROR = "gremlin.pageRankVertexProgram.convergenceError";
private MessageScope.Local<Double> incidentMessageScope = MessageScope.Local.of(__::outE); <2>
private MessageScope.Local<Double> countMessageScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.incidentMessageScope));
private PureTraversal<Vertex, Edge> edgeTraversal = null;
private PureTraversal<Vertex, ? extends Number> initialRankTraversal = null;
private double alpha = 0.85d;
private double epsilon = 0.00001d;
private int maxIterations = 20;
private String property = PAGE_RANK; <3>
private Set<VertexComputeKey> vertexComputeKeys;
private Set<MemoryComputeKey> memoryComputeKeys;
private PageRankVertexProgram() {
}
@Override
public void loadState(final Graph graph, final Configuration configuration) { <4>
if (configuration.containsKey(INITIAL_RANK_TRAVERSAL))
this.initialRankTraversal = PureTraversal.loadState(configuration, INITIAL_RANK_TRAVERSAL, graph);
if (configuration.containsKey(EDGE_TRAVERSAL)) {
this.edgeTraversal = PureTraversal.loadState(configuration, EDGE_TRAVERSAL, graph);
this.incidentMessageScope = MessageScope.Local.of(() -> this.edgeTraversal.get().clone());
this.countMessageScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.incidentMessageScope));
}
this.alpha = configuration.getDouble(ALPHA, this.alpha);
this.epsilon = configuration.getDouble(EPSILON, this.epsilon);
this.maxIterations = configuration.getInt(MAX_ITERATIONS, 20);
this.property = configuration.getString(PROPERTY, PAGE_RANK);
this.vertexComputeKeys = new HashSet<>(Arrays.asList(
VertexComputeKey.of(this.property, false),
VertexComputeKey.of(EDGE_COUNT, true))); <5>
this.memoryComputeKeys = new HashSet<>(Arrays.asList(
MemoryComputeKey.of(TELEPORTATION_ENERGY, Operator.sum, true, true),
MemoryComputeKey.of(VERTEX_COUNT, Operator.sum, true, true),
MemoryComputeKey.of(CONVERGENCE_ERROR, Operator.sum, false, true)));
}
@Override
public void storeState(final Configuration configuration) {
VertexProgram.super.storeState(configuration);
configuration.setProperty(ALPHA, this.alpha);
configuration.setProperty(EPSILON, this.epsilon);
configuration.setProperty(PROPERTY, this.property);
configuration.setProperty(MAX_ITERATIONS, this.maxIterations);
if (null != this.edgeTraversal)
this.edgeTraversal.storeState(configuration, EDGE_TRAVERSAL);
if (null != this.initialRankTraversal)
this.initialRankTraversal.storeState(configuration, INITIAL_RANK_TRAVERSAL);
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.NEW;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.VERTEX_PROPERTIES;
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() { <6>
return this.vertexComputeKeys;
}
@Override
public Optional<MessageCombiner<Double>> getMessageCombiner() {
return (Optional) PageRankMessageCombiner.instance();
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return this.memoryComputeKeys;
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
final Set<MessageScope> set = new HashSet<>();
set.add(memory.isInitialIteration() ? this.countMessageScope : this.incidentMessageScope);
return set;
}
@Override
public PageRankVertexProgram clone() {
try {
final PageRankVertexProgram clone = (PageRankVertexProgram) super.clone();
if (null != this.initialRankTraversal)
clone.initialRankTraversal = this.initialRankTraversal.clone();
return clone;
} catch (final CloneNotSupportedException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public void setup(final Memory memory) {
memory.set(TELEPORTATION_ENERGY, null == this.initialRankTraversal ? 1.0d : 0.0d);
memory.set(VERTEX_COUNT, 0.0d);
memory.set(CONVERGENCE_ERROR, 1.0d);
}
@Override
public void execute(final Vertex vertex, Messenger<Double> messenger, final Memory memory) { <7>
if (memory.isInitialIteration()) {
messenger.sendMessage(this.countMessageScope, 1.0d); <8>
memory.add(VERTEX_COUNT, 1.0d);
} else {
final double vertexCount = memory.<Double>get(VERTEX_COUNT);
final double edgeCount;
double pageRank;
if (1 == memory.getIteration()) {
edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0.0d, (a, b) -> a + b);
vertex.property(VertexProperty.Cardinality.single, EDGE_COUNT, edgeCount);
pageRank = null == this.initialRankTraversal ?
0.0d :
TraversalUtil.apply(vertex, this.initialRankTraversal.get()).doubleValue(); <9>
} else {
edgeCount = vertex.value(EDGE_COUNT);
pageRank = IteratorUtils.reduce(messenger.receiveMessages(), 0.0d, (a, b) -> a + b); <10>
}
//////////////////////////
final double teleporationEnergy = memory.get(TELEPORTATION_ENERGY);
if (teleporationEnergy > 0.0d) {
final double localTerminalEnergy = teleporationEnergy / vertexCount;
pageRank = pageRank + localTerminalEnergy;
memory.add(TELEPORTATION_ENERGY, -localTerminalEnergy);
}
final double previousPageRank = vertex.<Double>property(this.property).orElse(0.0d);
memory.add(CONVERGENCE_ERROR, Math.abs(pageRank - previousPageRank));
vertex.property(VertexProperty.Cardinality.single, this.property, pageRank);
memory.add(TELEPORTATION_ENERGY, (1.0d - this.alpha) * pageRank);
pageRank = this.alpha * pageRank;
if (edgeCount > 0.0d)
messenger.sendMessage(this.incidentMessageScope, pageRank / edgeCount);
else
memory.add(TELEPORTATION_ENERGY, pageRank);
}
}
@Override
public boolean terminate(final Memory memory) { <11>
boolean terminate = memory.<Double>get(CONVERGENCE_ERROR) < this.epsilon || memory.getIteration() >= this.maxIterations;
memory.set(CONVERGENCE_ERROR, 0.0d);
return terminate;
}
@Override
public String toString() {
return StringFactory.vertexProgramString(this, "alpha=" + this.alpha + ", epsilon=" + this.epsilon + ", iterations=" + this.maxIterations);
}
}
----
<1> `PageRankVertexProgram` implements `VertexProgram<Double>` because the messages it sends are Java doubles.
<2> The default path of energy propagation is via outgoing edges from the current vertex.
<3> The resulting PageRank values for the vertices are stored as a vertex property.
<4> A vertex program is constructed using an Apache `Configuration` to ensure easy dissemination across a cluster of JVMs.
<5> `EDGE_COUNT` is a transient "scratch data" compute key while `PAGE_RANK` is not.
<6> A vertex program must define the "compute keys" that are the properties being operated on during the computation.
<7> The "while"-loop of the vertex program.
<8> In order to determine how to distribute the energy to neighbors, a "1"-count is used to determine how many incident vertices exist for the `MessageScope`.
<9> Initially, each vertex is provided an equal amount of energy represented as a double.
<10> Energy is aggregated, computed on according to the PageRank algorithm, and then disseminated according to the defined `MessageScope.Local`.
<11> The computation is terminated after epsilon-convergence is met or a pre-defined number of iterations have taken place.
The above `PageRankVertexProgram` is used as follows.
[gremlin-groovy,modern]
----
result = graph.compute().program(PageRankVertexProgram.build().create()).submit().get()
result.memory().runtime
g = result.graph().traversal()
g.V().elementMap()
----
Note that `GraphTraversal` provides a <<pagerank-step,`pageRank()`>>-step.
[gremlin-groovy,modern]
----
g = graph.traversal().withComputer()
g.V().pageRank().elementMap()
g.V().pageRank().by('pageRank').times(5).order().by('pageRank').elementMap()
----
[[peerpressurevertexprogram]]
=== PeerPressureVertexProgram
The `PeerPressureVertexProgram` is a clustering algorithm that assigns a nominal value to each vertex in the graph.
The nominal value represents the vertex's cluster. If two vertices have the same nominal value, then they are in the
same cluster. The algorithm proceeds in the following manner.
. Every vertex assigns itself to a unique cluster ID (initially, its vertex ID).
. Every vertex determines its per neighbor vote strength as 1.0d / incident edges count.
. Every vertex sends its cluster ID and vote strength to its adjacent vertices as a `Pair<Serializable,Double>`
. Every vertex generates a vote energy distribution of received cluster IDs and changes its current cluster ID to the most frequent cluster ID.
.. If there is a tie, then the cluster with the lowest `toString()` comparison is selected.
. Steps 3 and 4 repeat until either a max number of iterations has occurred or no vertex has adjusted its cluster anymore.
Note that `GraphTraversal` provides a <<peerpressure-step,`peerPressure()`>>-step.
[gremlin-groovy,modern]
----
g = graph.traversal().withComputer()
g.V().peerPressure().by('cluster').elementMap()
g.V().peerPressure().by(outE('knows')).by('cluster').elementMap()
----
[[connectedcomponentvertexprogram]]
=== ConnectedComponentVertexProgram
The `ConnectedComponentVertexProgram` identifies link:https://en.wikipedia.org/wiki/Connected_component_(graph_theory)[Connected Component]
instances in a graph. See <<connectedcomponent-step,`connectedComponent()`>>-step for more information.
[[shortestpathvertexprogram]]
=== ShortestPathVertexProgram
The `ShortestPathVertexProram` provides an easy way to find shortest non-cyclic paths in the graph. It provides several options to configure
the output format, the start- and end-vertices, the direction, a custom distance function, as well as a distance limitation. By default it just
finds all undirected, shortest paths in the graph.
[gremlin-groovy,modern]
----
spvp = ShortestPathVertexProgram.build().create() <1>
result = graph.compute().program(spvp).submit().get() <2>
result.memory().get(ShortestPathVertexProgram.SHORTEST_PATHS) <3>
----
<1> Create a `ShortestPathVertexProgram` with its default configuration.
<2> Execute the `ShortestPathVertexProgram`.
<3> Get all shortest paths from the results memory.
[gremlin-groovy,modern]
----
spvp = ShortestPathVertexProgram.build().includeEdges(true).create() <1>
result = graph.compute().program(spvp).submit().get() <2>
result.memory().get(ShortestPathVertexProgram.SHORTEST_PATHS) <3>
----
<1> Create a `ShortestPathVertexProgram` as before, but configure it to include edges in the result.
<2> Execute the `ShortestPathVertexProgram`.
<3> Get all shortest paths from the results memory.
The `ShortestPathVertexProgram.Builder` provides the following configuration methods:
[width="100%",cols="3,15,5",options="header"]
|=========================================================
| Method | Description | Default
| `source(Traversal)` | Sets a filter traversal for the start vertices (e.g. `__.has('name','marko')`). | all vertices (`__.identity()`)
| `target(Traversal)` | Sets a filter traversal for the end vertices. | all vertices
| `edgeDirection(Direction)` | Sets the direction to traverse during the shortest path discovery. | `Direction.BOTH`
| `edgeTraversal(Traversal)` | Sets a traversal that emits the edges to traverse from the current vertex. | `__.bothE()`
| `distanceProperty(String)` | Sets the edge property to use for the distance calculations. | none
| `distanceTraversal(Traversal)` | Sets the traversal that calculates the distance for the current edge. | `__.constant(1)`
| `maxDistance(Traversal)` | Limits the shortest path distance. | none
| `includeEdges(Boolean)` | Whether to include edges in shortest paths or not. | `false`
|=========================================================
IMPORTANT: If a maximum distance is provided, the discovery process will only stop to follow a path at this distance if there was no
custom distance property or traversal provided. Custom distances can be negative, hence exceeding the maximum distance doesn't mean that there
can't be any more valid paths. However, paths will be filtered at the end, when no more non-cyclic paths can be found. The bottom line is that
custom distance properties or traversals can lead to much longer runtimes and a much higher memory consumption.
Note that `GraphTraversal` provides a <<shortestpath-step,`shortestPath()`>>-step.
[[clonevertexprogram]]
=== CloneVertexProgram
The `CloneVertexProgram` (known in versions prior to 3.2.10 as `BulkDumperVertexProgram`) copies a whole graph from
any graph `InputFormat` to any graph `OutputFormat`. TinkerPop provides the following:
* `OutputFormat`
** `GraphSONOutputFormat`
** `GryoOutputFormat`
** `ScriptOutputFormat`
* `InputFormat`
** `GraphSONInputFormat`
** `GryoInputFormat`
** `ScriptInputFormat`).
An <<clonevertexprogramusingspark,example>> is provided in the SparkGraphComputer section.
Graph Providers should consider writing their own `OutputFormat` and `InputFormat` which would allow bulk loading and
export capabilities through this `VertexProgram`. This topic is discussed further in the
link:https://tinkerpop.apache.org/docs/x.y.z/dev/provider/#bulk-import-export[Provider Documentation].
[[traversalvertexprogram]]
=== TraversalVertexProgram
image:traversal-vertex-program.png[width=250,float=left] The `TraversalVertexProgram` is a "special" VertexProgram in
that it can be executed via a `Traversal` and a `GraphComputer`. In Gremlin, it is possible to have
the same traversal executed using either the standard OLTP-engine or the `GraphComputer` OLAP-engine. The difference
being where the traversal is submitted.
NOTE: This model of graph traversal in a BSP system was first implemented by the
link:https://github.com/thinkaurelius/faunus/wiki[Faunus] graph analytics engine and originally described in
link:http://markorodriguez.com/2011/04/19/local-and-distributed-traversal-engines/[Local and Distributed Traversal Engines].
[gremlin-groovy,modern]
----
g = graph.traversal()
g.V().both().hasLabel('person').values('age').groupCount().next() // OLTP
g = graph.traversal().withComputer()
g.V().both().hasLabel('person').values('age').groupCount().next() // OLAP
----
image::olap-traversal.png[width=650]
In the OLAP example above, a `TraversalVertexProgram` is (logically) sent to each vertex in the graph. Each instance
evaluation requires (logically) 5 BSP iterations and each iteration is interpreted as such:
. `g.V()`: Put a traverser on each vertex in the graph.
. `both()`: Propagate each traverser to the vertices `both`-adjacent to its current vertex.
. `hasLabel('person')`: If the vertex is not a person, kill the traversers at that vertex.
. `values('age')`: Have all the traversers reference the integer age of their current vertex.
. `groupCount()`: Count how many times a particular age has been seen.
While 5 iterations were presented, in fact, `TraversalVertexProgram` will execute the traversal in only
2 iterations. The reason being is that `g.V().both()` and `hasLabel('person').values('age').groupCount()` can be
executed in a single iteration as any message sent would simply be to the current executing vertex. Thus, a simple optimization
exists in Gremlin OLAP called "reflexive message passing" which simulates non-message-passing BSP iterations within a
single BSP iteration.
The same OLAP traversal can be executed using the standard `graph.compute()` model, though at the expense of verbosity.
`TraversalVertexProgram` provides a fluent `Builder` for constructing a `TraversalVertexProgram`. The specified
`traversal()` can be either a direct `Traversal` object or a
link:http://en.wikipedia.org/wiki/Scripting_for_the_Java_Platform[JSR-223] script that will generate a
`Traversal`. There is no benefit to using the model below. It is demonstrated to help elucidate how Gremlin OLAP traversals
are ultimately compiled for execution on a `GraphComputer`.
[gremlin-groovy,modern]
----
result = graph.compute().program(TraversalVertexProgram.build().traversal(g.V().both().hasLabel('person').values('age').groupCount('a')).create()).submit().get()
result.memory().a
result.memory().iteration
result.memory().runtime
----
[[distributed-gremlin-gotchas]]
==== Distributed Gremlin Gotchas
Gremlin OLTP is not identical to Gremlin OLAP.
IMPORTANT: There are two primary theoretical differences between Gremlin OLTP and Gremlin OLAP. First, Gremlin OLTP
(via `Traversal`) leverages a link:http://en.wikipedia.org/wiki/Depth-first_search[depth-first] execution engine.
Depth-first execution has a limited memory footprint due to link:http://en.wikipedia.org/wiki/Lazy_evaluation[lazy evaluation].
On the other hand, Gremlin OLAP (via `TraversalVertexProgram`) leverages a
link:http://en.wikipedia.org/wiki/Breadth-first_search[breadth-first] execution engine which maintains a larger memory
footprint, but a better time complexity due to vertex-local traversers being able to be "bulked." The second difference
is that Gremlin OLTP is executed in a serial/streaming fashion, while Gremlin OLAP is executed in a parallel/step-wise fashion. These two
fundamental differences lead to the behaviors enumerated below.
image::gremlin-without-a-cause.png[width=200,float=right]
. Traversal sideEffects are represented as a distributed data structure across `GraphComputer` workers. It is not
possible to get a global view of a sideEffect until after an iteration has occurred and global sideEffects are re-broadcasted to the workers.
In some situations, a "stale" local representation of the sideEffect is sufficient to ensure the intended semantics of the
traversal are respected. However, this is not generally true so be wary of traversals that require global views of a
sideEffect. To ensure a fresh global representation, use `barrier()` prior to accessing the global sideEffect. Note that this
only comes into play with custom steps and <<general-steps,lambda steps>>. The standard Gremlin step library is respective of OLAP semantics.
. When evaluating traversals that rely on path information (i.e. the history of the traversal), practical
computational limits can easily be reached due the link:http://en.wikipedia.org/wiki/Combinatorial_explosion[combinatoric explosion]
of data. With path computing enabled, every traverser is unique and thus, must be enumerated as opposed to being
counted/merged. The difference being a collection of paths vs. a single 64-bit long at a single vertex. In other words,
bulking is very unlikely with traversers that maintain path information. For more
information on this concept, please see link:https://thinkaurelius.wordpress.com/2012/11/11/faunus-provides-big-graph-data-analytics/[Faunus Provides Big Graph Data].
. Steps that are concerned with the global ordering of traversers do not have a meaningful representation in
OLAP. For example, what does <<order-step,`order()`>>-step mean when all traversers are being processed in parallel?
Even if the traversers were aggregated and ordered, then at the next step they would return to being executed in
parallel and thus, in an unpredictable order. When `order()`-like steps are executed at the end of a traversal (i.e
the final step), `TraversalVertexProgram` ensures a serial representation is ordered accordingly. Moreover, it is intelligent enough
to maintain the ordering of `g.V().hasLabel("person").order().by("age").values("name")`. However, the OLAP traversal
`g.V().hasLabel("person").order().by("age").out().values("name")` will lose the original ordering as the `out()`-step
will rebroadcast traversers across the cluster.
[[graph-filter]]
== Graph Filter
Most OLAP jobs do not require the entire source graph to faithfully execute their `VertexProgram`. For instance, if
`PageRankVertexProgram` is only going to compute the centrality of people in the friendship-graph, then the following
`GraphFilter` can be applied.
[source,java]
----
graph.computer().
vertices(hasLabel("person")).
edges(bothE("knows")).
program(PageRankVertexProgram...)
----
There are two methods for constructing a `GraphFilter`.
* `vertices(Traversal<Vertex,Vertex>)`: A traversal that will be used that can only analyze a vertex and its properties.
If the traversal `hasNext()`, the input `Vertex` is passed to the `GraphComputer`.
* `edges(Traversal<Vertex,Edge>)`: A traversal that will iterate all legal edges for the source vertex.
`GraphFilter` is a "push-down predicate" that providers can reason on to determine the most efficient way to provide
graph data to the `GraphComputer`.
IMPORTANT: Apache TinkerPop provides `GraphFilterStrategy` <<traversalstrategy,traversal strategy>> which analyzes a submitted
OLAP traversal and, if possible, creates an appropriate `GraphFilter` automatically. For instance, `g.V().count()` would
yield a `GraphFilter.edges(limit(0))`. Thus, for traversal submissions, users typically do not need to be aware of creating
graph filters explicitly. Users can use the <<explain-step,`explain()`>>-step to see the `GraphFilter` generated by `GraphFilterStrategy`.