| //// |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| //// |
| [[graphcomputer]] |
| = The GraphComputer |
| |
| image:graphcomputer-puffers.png[width=350,float=right] TinkerPop provides two primary means of interacting with a |
| graph: link:http://en.wikipedia.org/wiki/Online_transaction_processing[online transaction processing] (OLTP) and |
| link:http://en.wikipedia.org/wiki/Online_analytical_processing[online analytical processing] (OLAP). OLTP-based |
| graph systems allow the user to query the graph in real-time. However, typically, real-time performance is only |
| possible when a local traversal is enacted. A local traversal is one that starts at a particular vertex (or small set |
| of vertices) and touches a small set of connected vertices (by any arbitrary path of arbitrary length). In short, OLTP |
| queries interact with a limited set of data and respond on the order of milliseconds or seconds. On the other hand, |
| with OLAP graph processing, the entire graph is processed and thus, every vertex and edge is analyzed (some times |
| more than once for iterative, recursive algorithms). Due to the amount of data being processed, the results are |
| typically not returned in real-time and for massive graphs (i.e. graphs represented across a cluster of machines), |
| results can take on the order of minutes or hours. |
| |
| * *OLTP*: real-time, limited data accessed, random data access, sequential processing, querying |
| * *OLAP*: long running, entire data set accessed, sequential data access, parallel processing, batch processing |
| |
| image::oltp-vs-olap.png[width=600] |
| |
| The image above demonstrates the difference between Gremlin OLTP and Gremlin OLAP. With Gremlin OLTP, the graph is |
| walked by moving from vertex-to-vertex via incident edges. With Gremlin OLAP, all vertices are provided a |
| `VertexProgram`. The programs send messages to one another with the topological structure of the graph acting as the |
| communication network (though random message passing possible). In many respects, the messages passed are like |
| the OLTP traversers moving from vertex-to-vertex. However, all messages are moving independent of one another, in |
| parallel. Once a vertex program is finished computing, TinkerPop's OLAP engine supports any number |
| link:http://en.wikipedia.org/wiki/MapReduce[`MapReduce`] jobs over the resultant graph. |
| |
| IMPORTANT: `GraphComputer` was designed from the start to be used within a multi-JVM, distributed environment -- |
| in other words, a multi-machine compute cluster. As such, all the computing objects must be able to be migrated |
| between JVMs. The pattern promoted is to store state information in a `Configuration` object to later be regenerated |
| by a loading process. It is important to realize that `VertexProgram`, `MapReduce`, and numerous particular instances |
| rely heavily on the state of the computing classes (not the structure, but the processes) to be stored in a |
| `Configuration`. |
| |
| [[vertexprogram]] |
| == VertexProgram |
| |
| image:bsp-diagram.png[width=400,float=right] GraphComputer takes a `VertexProgram`. A VertexProgram can be thought of |
| as a piece of code that is executed at each vertex in logically parallel manner until some termination condition is |
| met (e.g. a number of iterations have occurred, no more data is changing in the graph, etc.). A submitted |
| `VertexProgram` is copied to all the workers in the graph. A worker is not an explicit concept in the API, but is |
| assumed of all `GraphComputer` implementations. At minimum each vertex is a worker (though this would be inefficient |
| due to the fact that each vertex would maintain a VertexProgram). In practice, the workers partition the vertex set |
| and are responsible for the execution of the VertexProgram over all the vertices within their sphere of influence. |
| The workers orchestrate the execution of the `VertexProgram.execute()` method on all their vertices in an |
| link:http://en.wikipedia.org/wiki/Bulk_synchronous_parallel[bulk synchronous parallel] (BSP) fashion. The vertices |
| are able to communicate with one another via messages. There are two kinds of messages in Gremlin OLAP: |
| `MessageScope.Local` and `MessageScope.Global`. A local message is a message to an adjacent vertex. A global |
| message is a message to any arbitrary vertex in the graph. Once the VertexProgram has completed its execution, |
| any number of `MapReduce` jobs are evaluated. MapReduce jobs are provided by the user via `GraphComputer.mapReduce()` |
| or by the `VertexProgram` via `VertexProgram.getMapReducers()`. |
| |
| image::graphcomputer.png[width=500] |
| |
| The example below demonstrates how to submit a VertexProgram to a graph's GraphComputer. `GraphComputer.submit()` |
| yields a `Future<ComputerResult>`. The `ComputerResult` has the resultant computed graph which can be a full copy |
| of the original graph (see <<hadoop-gremlin,Hadoop-Gremlin>>) or a view over the original graph (see |
| <<tinkergraph-gremlin,TinkerGraph>>). The ComputerResult also provides access to computational side-effects called `Memory` |
| (which includes, for example, runtime, number of iterations, results of MapReduce jobs, and VertexProgram-specific |
| memory manipulations). |
| |
| [gremlin-groovy,modern] |
| ---- |
| result = graph.compute().program(PageRankVertexProgram.build().create()).submit().get() |
| result.memory().runtime |
| g = result.graph().traversal() |
| g.V().elementMap() |
| ---- |
| |
| NOTE: This model of "vertex-centric graph computing" was made popular by Google's |
| link:http://googleresearch.blogspot.com/2009/06/large-scale-graph-computing-at-google.html[Pregel] graph engine. |
| In the open source world, this model is found in OLAP graph computing systems such as link:https://giraph.apache.org/[Giraph], |
| link:https://hama.apache.org/[Hama]. TinkerPop extends the |
| popularized model with integrated post-processing <<mapreduce,MapReduce>> jobs over the vertex set. |
| |
| [[mapreduce]] |
| == MapReduce |
| |
| The BSP model proposed by Pregel stores the results of the computation in a distributed manner as properties on the |
| elements in the graph. In many situations, it is necessary to aggregate those resultant properties into a single |
| result set (i.e. a statistic). For instance, assume a VertexProgram that computes a nominal cluster for each vertex |
| (i.e. link:http://en.wikipedia.org/wiki/Community_structure[a graph clustering algorithm]). At the end of the |
| computation, each vertex will have a property denoting the cluster it was assigned to. TinkerPop provides the |
| ability to answer global questions about the clusters. For instance, in order to answer the following questions, |
| `MapReduce` jobs are required: |
| |
| * How many vertices are in each cluster? (*presented below*) |
| * How many unique clusters are there? (*presented below*) |
| * What is the average age of each vertex in each cluster? |
| * What is the degree distribution of the vertices in each cluster? |
| |
| A compressed representation of the `MapReduce` API in TinkerPop is provided below. The key idea is that the |
| `map`-stage processes all vertices to emit key/value pairs. Those values are aggregated on their respective key |
| for the `reduce`-stage to do its processing to ultimately yield more key/value pairs. |
| |
| [source,java] |
| public interface MapReduce<MK, MV, RK, RV, R> { |
| public void map(final Vertex vertex, final MapEmitter<MK, MV> emitter); |
| public void reduce(final MK key, final Iterator<MV> values, final ReduceEmitter<RK, RV> emitter); |
| // there are more methods |
| } |
| |
| IMPORTANT: The vertex that is passed into the `MapReduce.map()` method does not contain edges. The vertex only |
| contains original and computed vertex properties. This reduces the amount of data required to be loaded and ensures |
| that MapReduce is used for post-processing computed results. All edge-based computing should be accomplished in the |
| `VertexProgram`. |
| |
| image:mapreduce.png[width=650] |
| |
| The `MapReduce` extension to GraphComputer is made explicit when examining the |
| <<peerpressurevertexprogram,`PeerPressureVertexProgram`>> and corresponding `ClusterPopulationMapReduce`. |
| In the code below, the GraphComputer result returns the computed on `Graph` as well as the `Memory` of the |
| computation (`ComputerResult`). The memory maintain the results of any MapReduce jobs. The cluster population |
| MapReduce result states that there are 5 vertices in cluster 1 and 1 vertex in cluster 6. This can be verified |
| (in a serial manner) by looking at the `PeerPressureVertexProgram.CLUSTER` property of the resultant graph. Notice |
| that the property is "hidden" unless it is directly accessed via name. |
| |
| [gremlin-groovy,modern] |
| ---- |
| graph = TinkerFactory.createModern() |
| result = graph.compute().program(PeerPressureVertexProgram.build().create()).mapReduce(ClusterPopulationMapReduce.build().create()).submit().get() |
| result.memory().get('clusterPopulation') |
| g = result.graph().traversal() |
| g.V().values(PeerPressureVertexProgram.CLUSTER).groupCount().next() |
| g.V().elementMap() |
| ---- |
| |
| If there are numerous statistics desired, then its possible to register as many MapReduce jobs as needed. For |
| instance, the `ClusterCountMapReduce` determines how many unique clusters were created by the peer pressure algorithm. |
| Below both `ClusterCountMapReduce` and `ClusterPopulationMapReduce` are computed over the resultant graph. |
| |
| [gremlin-groovy,modern] |
| ---- |
| result = graph.compute().program(PeerPressureVertexProgram.build().create()). |
| mapReduce(ClusterPopulationMapReduce.build().create()). |
| mapReduce(ClusterCountMapReduce.build().create()).submit().get() |
| result.memory().clusterPopulation |
| result.memory().clusterCount |
| ---- |
| |
| IMPORTANT: The MapReduce model of TinkerPop does not support MapReduce chaining. Thus, the order in which the |
| MapReduce jobs are executed is irrelevant. This is made apparent when realizing that the `map()`-stage takes a |
| `Vertex` as its input and the `reduce()`-stage yields key/value pairs. Thus, the results of reduce can not fed back |
| into a `map()`. |
| |
| == A Collection of VertexPrograms |
| |
| TinkerPop provides a collection of VertexPrograms that implement common algorithms. This section discusses the various |
| implementations. |
| |
| IMPORTANT: The vertex programs presented are what are provided as of TinkerPop x.y.z. Over time, with future releases, |
| more algorithms will be added. |
| |
| [[pagerankvertexprogram]] |
| === PageRankVertexProgram |
| |
| image:gremlin-pagerank.png[width=400,float=right] link:http://en.wikipedia.org/wiki/PageRank[PageRank] is perhaps the |
| most popular OLAP-oriented graph algorithm. This link:http://en.wikipedia.org/wiki/Centrality[eigenvector centrality] |
| variant was developed by Brin and Page of Google. PageRank defines a centrality value for all vertices in the graph, |
| where centrality is defined recursively where a vertex is central if it is connected to central vertices. PageRank is |
| an iterative algorithm that converges to a link:http://en.wikipedia.org/wiki/Ergodicity[steady state distribution]. If |
| the pageRank values are normalized to 1.0, then the pageRank value of a vertex is the probability that a random walker |
| will be seen that that vertex in the graph at any arbitrary moment in time. In order to help developers understand the |
| methods of a `VertexProgram`, the PageRankVertexProgram code is analyzed below. |
| |
| [source,java] |
| ---- |
| public class PageRankVertexProgram implements VertexProgram<Double> { <1> |
| |
| public static final String PAGE_RANK = "gremlin.pageRankVertexProgram.pageRank"; |
| private static final String EDGE_COUNT = "gremlin.pageRankVertexProgram.edgeCount"; |
| private static final String PROPERTY = "gremlin.pageRankVertexProgram.property"; |
| private static final String VERTEX_COUNT = "gremlin.pageRankVertexProgram.vertexCount"; |
| private static final String ALPHA = "gremlin.pageRankVertexProgram.alpha"; |
| private static final String EPSILON = "gremlin.pageRankVertexProgram.epsilon"; |
| private static final String MAX_ITERATIONS = "gremlin.pageRankVertexProgram.maxIterations"; |
| private static final String EDGE_TRAVERSAL = "gremlin.pageRankVertexProgram.edgeTraversal"; |
| private static final String INITIAL_RANK_TRAVERSAL = "gremlin.pageRankVertexProgram.initialRankTraversal"; |
| private static final String TELEPORTATION_ENERGY = "gremlin.pageRankVertexProgram.teleportationEnergy"; |
| private static final String CONVERGENCE_ERROR = "gremlin.pageRankVertexProgram.convergenceError"; |
| |
| private MessageScope.Local<Double> incidentMessageScope = MessageScope.Local.of(__::outE); <2> |
| private MessageScope.Local<Double> countMessageScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.incidentMessageScope)); |
| private PureTraversal<Vertex, Edge> edgeTraversal = null; |
| private PureTraversal<Vertex, ? extends Number> initialRankTraversal = null; |
| private double alpha = 0.85d; |
| private double epsilon = 0.00001d; |
| private int maxIterations = 20; |
| private String property = PAGE_RANK; <3> |
| private Set<VertexComputeKey> vertexComputeKeys; |
| private Set<MemoryComputeKey> memoryComputeKeys; |
| |
| private PageRankVertexProgram() { |
| |
| } |
| |
| @Override |
| public void loadState(final Graph graph, final Configuration configuration) { <4> |
| if (configuration.containsKey(INITIAL_RANK_TRAVERSAL)) |
| this.initialRankTraversal = PureTraversal.loadState(configuration, INITIAL_RANK_TRAVERSAL, graph); |
| if (configuration.containsKey(EDGE_TRAVERSAL)) { |
| this.edgeTraversal = PureTraversal.loadState(configuration, EDGE_TRAVERSAL, graph); |
| this.incidentMessageScope = MessageScope.Local.of(() -> this.edgeTraversal.get().clone()); |
| this.countMessageScope = MessageScope.Local.of(new MessageScope.Local.ReverseTraversalSupplier(this.incidentMessageScope)); |
| } |
| this.alpha = configuration.getDouble(ALPHA, this.alpha); |
| this.epsilon = configuration.getDouble(EPSILON, this.epsilon); |
| this.maxIterations = configuration.getInt(MAX_ITERATIONS, 20); |
| this.property = configuration.getString(PROPERTY, PAGE_RANK); |
| this.vertexComputeKeys = new HashSet<>(Arrays.asList( |
| VertexComputeKey.of(this.property, false), |
| VertexComputeKey.of(EDGE_COUNT, true))); <5> |
| this.memoryComputeKeys = new HashSet<>(Arrays.asList( |
| MemoryComputeKey.of(TELEPORTATION_ENERGY, Operator.sum, true, true), |
| MemoryComputeKey.of(VERTEX_COUNT, Operator.sum, true, true), |
| MemoryComputeKey.of(CONVERGENCE_ERROR, Operator.sum, false, true))); |
| } |
| |
| @Override |
| public void storeState(final Configuration configuration) { |
| VertexProgram.super.storeState(configuration); |
| configuration.setProperty(ALPHA, this.alpha); |
| configuration.setProperty(EPSILON, this.epsilon); |
| configuration.setProperty(PROPERTY, this.property); |
| configuration.setProperty(MAX_ITERATIONS, this.maxIterations); |
| if (null != this.edgeTraversal) |
| this.edgeTraversal.storeState(configuration, EDGE_TRAVERSAL); |
| if (null != this.initialRankTraversal) |
| this.initialRankTraversal.storeState(configuration, INITIAL_RANK_TRAVERSAL); |
| } |
| |
| @Override |
| public GraphComputer.ResultGraph getPreferredResultGraph() { |
| return GraphComputer.ResultGraph.NEW; |
| } |
| |
| @Override |
| public GraphComputer.Persist getPreferredPersist() { |
| return GraphComputer.Persist.VERTEX_PROPERTIES; |
| } |
| |
| @Override |
| public Set<VertexComputeKey> getVertexComputeKeys() { <6> |
| return this.vertexComputeKeys; |
| } |
| |
| @Override |
| public Optional<MessageCombiner<Double>> getMessageCombiner() { |
| return (Optional) PageRankMessageCombiner.instance(); |
| } |
| |
| @Override |
| public Set<MemoryComputeKey> getMemoryComputeKeys() { |
| return this.memoryComputeKeys; |
| } |
| |
| @Override |
| public Set<MessageScope> getMessageScopes(final Memory memory) { |
| final Set<MessageScope> set = new HashSet<>(); |
| set.add(memory.isInitialIteration() ? this.countMessageScope : this.incidentMessageScope); |
| return set; |
| } |
| |
| @Override |
| public PageRankVertexProgram clone() { |
| try { |
| final PageRankVertexProgram clone = (PageRankVertexProgram) super.clone(); |
| if (null != this.initialRankTraversal) |
| clone.initialRankTraversal = this.initialRankTraversal.clone(); |
| return clone; |
| } catch (final CloneNotSupportedException e) { |
| throw new IllegalStateException(e.getMessage(), e); |
| } |
| } |
| |
| @Override |
| public void setup(final Memory memory) { |
| memory.set(TELEPORTATION_ENERGY, null == this.initialRankTraversal ? 1.0d : 0.0d); |
| memory.set(VERTEX_COUNT, 0.0d); |
| memory.set(CONVERGENCE_ERROR, 1.0d); |
| } |
| |
| @Override |
| public void execute(final Vertex vertex, Messenger<Double> messenger, final Memory memory) { <7> |
| if (memory.isInitialIteration()) { |
| messenger.sendMessage(this.countMessageScope, 1.0d); <8> |
| memory.add(VERTEX_COUNT, 1.0d); |
| } else { |
| final double vertexCount = memory.<Double>get(VERTEX_COUNT); |
| final double edgeCount; |
| double pageRank; |
| if (1 == memory.getIteration()) { |
| edgeCount = IteratorUtils.reduce(messenger.receiveMessages(), 0.0d, (a, b) -> a + b); |
| vertex.property(VertexProperty.Cardinality.single, EDGE_COUNT, edgeCount); |
| pageRank = null == this.initialRankTraversal ? |
| 0.0d : |
| TraversalUtil.apply(vertex, this.initialRankTraversal.get()).doubleValue(); <9> |
| } else { |
| edgeCount = vertex.value(EDGE_COUNT); |
| pageRank = IteratorUtils.reduce(messenger.receiveMessages(), 0.0d, (a, b) -> a + b); <10> |
| } |
| ////////////////////////// |
| final double teleporationEnergy = memory.get(TELEPORTATION_ENERGY); |
| if (teleporationEnergy > 0.0d) { |
| final double localTerminalEnergy = teleporationEnergy / vertexCount; |
| pageRank = pageRank + localTerminalEnergy; |
| memory.add(TELEPORTATION_ENERGY, -localTerminalEnergy); |
| } |
| final double previousPageRank = vertex.<Double>property(this.property).orElse(0.0d); |
| memory.add(CONVERGENCE_ERROR, Math.abs(pageRank - previousPageRank)); |
| vertex.property(VertexProperty.Cardinality.single, this.property, pageRank); |
| memory.add(TELEPORTATION_ENERGY, (1.0d - this.alpha) * pageRank); |
| pageRank = this.alpha * pageRank; |
| if (edgeCount > 0.0d) |
| messenger.sendMessage(this.incidentMessageScope, pageRank / edgeCount); |
| else |
| memory.add(TELEPORTATION_ENERGY, pageRank); |
| } |
| } |
| |
| @Override |
| public boolean terminate(final Memory memory) { <11> |
| boolean terminate = memory.<Double>get(CONVERGENCE_ERROR) < this.epsilon || memory.getIteration() >= this.maxIterations; |
| memory.set(CONVERGENCE_ERROR, 0.0d); |
| return terminate; |
| } |
| |
| @Override |
| public String toString() { |
| return StringFactory.vertexProgramString(this, "alpha=" + this.alpha + ", epsilon=" + this.epsilon + ", iterations=" + this.maxIterations); |
| } |
| } |
| ---- |
| |
| <1> `PageRankVertexProgram` implements `VertexProgram<Double>` because the messages it sends are Java doubles. |
| <2> The default path of energy propagation is via outgoing edges from the current vertex. |
| <3> The resulting PageRank values for the vertices are stored as a vertex property. |
| <4> A vertex program is constructed using an Apache `Configuration` to ensure easy dissemination across a cluster of JVMs. |
| <5> `EDGE_COUNT` is a transient "scratch data" compute key while `PAGE_RANK` is not. |
| <6> A vertex program must define the "compute keys" that are the properties being operated on during the computation. |
| <7> The "while"-loop of the vertex program. |
| <8> In order to determine how to distribute the energy to neighbors, a "1"-count is used to determine how many incident vertices exist for the `MessageScope`. |
| <9> Initially, each vertex is provided an equal amount of energy represented as a double. |
| <10> Energy is aggregated, computed on according to the PageRank algorithm, and then disseminated according to the defined `MessageScope.Local`. |
| <11> The computation is terminated after epsilon-convergence is met or a pre-defined number of iterations have taken place. |
| |
| The above `PageRankVertexProgram` is used as follows. |
| |
| [gremlin-groovy,modern] |
| ---- |
| result = graph.compute().program(PageRankVertexProgram.build().create()).submit().get() |
| result.memory().runtime |
| g = result.graph().traversal() |
| g.V().elementMap() |
| ---- |
| |
| Note that `GraphTraversal` provides a <<pagerank-step,`pageRank()`>>-step. |
| |
| [gremlin-groovy,modern] |
| ---- |
| g = graph.traversal().withComputer() |
| g.V().pageRank().elementMap() |
| g.V().pageRank().by('pageRank').times(5).order().by('pageRank').elementMap() |
| ---- |
| |
| [[peerpressurevertexprogram]] |
| === PeerPressureVertexProgram |
| |
| The `PeerPressureVertexProgram` is a clustering algorithm that assigns a nominal value to each vertex in the graph. |
| The nominal value represents the vertex's cluster. If two vertices have the same nominal value, then they are in the |
| same cluster. The algorithm proceeds in the following manner. |
| |
| . Every vertex assigns itself to a unique cluster ID (initially, its vertex ID). |
| . Every vertex determines its per neighbor vote strength as 1.0d / incident edges count. |
| . Every vertex sends its cluster ID and vote strength to its adjacent vertices as a `Pair<Serializable,Double>` |
| . Every vertex generates a vote energy distribution of received cluster IDs and changes its current cluster ID to the most frequent cluster ID. |
| .. If there is a tie, then the cluster with the lowest `toString()` comparison is selected. |
| . Steps 3 and 4 repeat until either a max number of iterations has occurred or no vertex has adjusted its cluster anymore. |
| |
| Note that `GraphTraversal` provides a <<peerpressure-step,`peerPressure()`>>-step. |
| |
| [gremlin-groovy,modern] |
| ---- |
| g = graph.traversal().withComputer() |
| g.V().peerPressure().by('cluster').elementMap() |
| g.V().peerPressure().by(outE('knows')).by('cluster').elementMap() |
| ---- |
| |
| [[connectedcomponentvertexprogram]] |
| === ConnectedComponentVertexProgram |
| |
| The `ConnectedComponentVertexProgram` identifies link:https://en.wikipedia.org/wiki/Connected_component_(graph_theory)[Connected Component] |
| instances in a graph. See <<connectedcomponent-step,`connectedComponent()`>>-step for more information. |
| |
| [[shortestpathvertexprogram]] |
| === ShortestPathVertexProgram |
| |
| The `ShortestPathVertexProram` provides an easy way to find shortest non-cyclic paths in the graph. It provides several options to configure |
| the output format, the start- and end-vertices, the direction, a custom distance function, as well as a distance limitation. By default it just |
| finds all undirected, shortest paths in the graph. |
| |
| [gremlin-groovy,modern] |
| ---- |
| spvp = ShortestPathVertexProgram.build().create() <1> |
| result = graph.compute().program(spvp).submit().get() <2> |
| result.memory().get(ShortestPathVertexProgram.SHORTEST_PATHS) <3> |
| ---- |
| |
| <1> Create a `ShortestPathVertexProgram` with its default configuration. |
| <2> Execute the `ShortestPathVertexProgram`. |
| <3> Get all shortest paths from the results memory. |
| |
| [gremlin-groovy,modern] |
| ---- |
| spvp = ShortestPathVertexProgram.build().includeEdges(true).create() <1> |
| result = graph.compute().program(spvp).submit().get() <2> |
| result.memory().get(ShortestPathVertexProgram.SHORTEST_PATHS) <3> |
| ---- |
| |
| <1> Create a `ShortestPathVertexProgram` as before, but configure it to include edges in the result. |
| <2> Execute the `ShortestPathVertexProgram`. |
| <3> Get all shortest paths from the results memory. |
| |
| The `ShortestPathVertexProgram.Builder` provides the following configuration methods: |
| |
| [width="100%",cols="3,15,5",options="header"] |
| |========================================================= |
| | Method | Description | Default |
| | `source(Traversal)` | Sets a filter traversal for the start vertices (e.g. `__.has('name','marko')`). | all vertices (`__.identity()`) |
| | `target(Traversal)` | Sets a filter traversal for the end vertices. | all vertices |
| | `edgeDirection(Direction)` | Sets the direction to traverse during the shortest path discovery. | `Direction.BOTH` |
| | `edgeTraversal(Traversal)` | Sets a traversal that emits the edges to traverse from the current vertex. | `__.bothE()` |
| | `distanceProperty(String)` | Sets the edge property to use for the distance calculations. | none |
| | `distanceTraversal(Traversal)` | Sets the traversal that calculates the distance for the current edge. | `__.constant(1)` |
| | `maxDistance(Traversal)` | Limits the shortest path distance. | none |
| | `includeEdges(Boolean)` | Whether to include edges in shortest paths or not. | `false` |
| |========================================================= |
| |
| IMPORTANT: If a maximum distance is provided, the discovery process will only stop to follow a path at this distance if there was no |
| custom distance property or traversal provided. Custom distances can be negative, hence exceeding the maximum distance doesn't mean that there |
| can't be any more valid paths. However, paths will be filtered at the end, when no more non-cyclic paths can be found. The bottom line is that |
| custom distance properties or traversals can lead to much longer runtimes and a much higher memory consumption. |
| |
| Note that `GraphTraversal` provides a <<shortestpath-step,`shortestPath()`>>-step. |
| |
| [[clonevertexprogram]] |
| === CloneVertexProgram |
| |
| The `CloneVertexProgram` (known in versions prior to 3.2.10 as `BulkDumperVertexProgram`) copies a whole graph from |
| any graph `InputFormat` to any graph `OutputFormat`. TinkerPop provides the following: |
| |
| * `OutputFormat` |
| ** `GraphSONOutputFormat` |
| ** `GryoOutputFormat` |
| ** `ScriptOutputFormat` |
| * `InputFormat` |
| ** `GraphSONInputFormat` |
| ** `GryoInputFormat` |
| ** `ScriptInputFormat`). |
| |
| An <<clonevertexprogramusingspark,example>> is provided in the SparkGraphComputer section. |
| |
| Graph Providers should consider writing their own `OutputFormat` and `InputFormat` which would allow bulk loading and |
| export capabilities through this `VertexProgram`. This topic is discussed further in the |
| link:https://tinkerpop.apache.org/docs/x.y.z/dev/provider/#bulk-import-export[Provider Documentation]. |
| |
| [[traversalvertexprogram]] |
| === TraversalVertexProgram |
| |
| image:traversal-vertex-program.png[width=250,float=left] The `TraversalVertexProgram` is a "special" VertexProgram in |
| that it can be executed via a `Traversal` and a `GraphComputer`. In Gremlin, it is possible to have |
| the same traversal executed using either the standard OLTP-engine or the `GraphComputer` OLAP-engine. The difference |
| being where the traversal is submitted. |
| |
| NOTE: This model of graph traversal in a BSP system was first implemented by the |
| link:https://github.com/thinkaurelius/faunus/wiki[Faunus] graph analytics engine and originally described in |
| link:http://markorodriguez.com/2011/04/19/local-and-distributed-traversal-engines/[Local and Distributed Traversal Engines]. |
| |
| [gremlin-groovy,modern] |
| ---- |
| g = graph.traversal() |
| g.V().both().hasLabel('person').values('age').groupCount().next() // OLTP |
| g = graph.traversal().withComputer() |
| g.V().both().hasLabel('person').values('age').groupCount().next() // OLAP |
| ---- |
| |
| image::olap-traversal.png[width=650] |
| |
| In the OLAP example above, a `TraversalVertexProgram` is (logically) sent to each vertex in the graph. Each instance |
| evaluation requires (logically) 5 BSP iterations and each iteration is interpreted as such: |
| |
| . `g.V()`: Put a traverser on each vertex in the graph. |
| . `both()`: Propagate each traverser to the vertices `both`-adjacent to its current vertex. |
| . `hasLabel('person')`: If the vertex is not a person, kill the traversers at that vertex. |
| . `values('age')`: Have all the traversers reference the integer age of their current vertex. |
| . `groupCount()`: Count how many times a particular age has been seen. |
| |
| While 5 iterations were presented, in fact, `TraversalVertexProgram` will execute the traversal in only |
| 2 iterations. The reason being is that `g.V().both()` and `hasLabel('person').values('age').groupCount()` can be |
| executed in a single iteration as any message sent would simply be to the current executing vertex. Thus, a simple optimization |
| exists in Gremlin OLAP called "reflexive message passing" which simulates non-message-passing BSP iterations within a |
| single BSP iteration. |
| |
| The same OLAP traversal can be executed using the standard `graph.compute()` model, though at the expense of verbosity. |
| `TraversalVertexProgram` provides a fluent `Builder` for constructing a `TraversalVertexProgram`. The specified |
| `traversal()` can be either a direct `Traversal` object or a |
| link:http://en.wikipedia.org/wiki/Scripting_for_the_Java_Platform[JSR-223] script that will generate a |
| `Traversal`. There is no benefit to using the model below. It is demonstrated to help elucidate how Gremlin OLAP traversals |
| are ultimately compiled for execution on a `GraphComputer`. |
| |
| [gremlin-groovy,modern] |
| ---- |
| result = graph.compute().program(TraversalVertexProgram.build().traversal(g.V().both().hasLabel('person').values('age').groupCount('a')).create()).submit().get() |
| result.memory().a |
| result.memory().iteration |
| result.memory().runtime |
| ---- |
| |
| [[distributed-gremlin-gotchas]] |
| ==== Distributed Gremlin Gotchas |
| |
| Gremlin OLTP is not identical to Gremlin OLAP. |
| |
| IMPORTANT: There are two primary theoretical differences between Gremlin OLTP and Gremlin OLAP. First, Gremlin OLTP |
| (via `Traversal`) leverages a link:http://en.wikipedia.org/wiki/Depth-first_search[depth-first] execution engine. |
| Depth-first execution has a limited memory footprint due to link:http://en.wikipedia.org/wiki/Lazy_evaluation[lazy evaluation]. |
| On the other hand, Gremlin OLAP (via `TraversalVertexProgram`) leverages a |
| link:http://en.wikipedia.org/wiki/Breadth-first_search[breadth-first] execution engine which maintains a larger memory |
| footprint, but a better time complexity due to vertex-local traversers being able to be "bulked." The second difference |
| is that Gremlin OLTP is executed in a serial/streaming fashion, while Gremlin OLAP is executed in a parallel/step-wise fashion. These two |
| fundamental differences lead to the behaviors enumerated below. |
| |
| image::gremlin-without-a-cause.png[width=200,float=right] |
| |
| . Traversal sideEffects are represented as a distributed data structure across `GraphComputer` workers. It is not |
| possible to get a global view of a sideEffect until after an iteration has occurred and global sideEffects are re-broadcasted to the workers. |
| In some situations, a "stale" local representation of the sideEffect is sufficient to ensure the intended semantics of the |
| traversal are respected. However, this is not generally true so be wary of traversals that require global views of a |
| sideEffect. To ensure a fresh global representation, use `barrier()` prior to accessing the global sideEffect. Note that this |
| only comes into play with custom steps and <<general-steps,lambda steps>>. The standard Gremlin step library is respective of OLAP semantics. |
| . When evaluating traversals that rely on path information (i.e. the history of the traversal), practical |
| computational limits can easily be reached due the link:http://en.wikipedia.org/wiki/Combinatorial_explosion[combinatoric explosion] |
| of data. With path computing enabled, every traverser is unique and thus, must be enumerated as opposed to being |
| counted/merged. The difference being a collection of paths vs. a single 64-bit long at a single vertex. In other words, |
| bulking is very unlikely with traversers that maintain path information. For more |
| information on this concept, please see link:https://thinkaurelius.wordpress.com/2012/11/11/faunus-provides-big-graph-data-analytics/[Faunus Provides Big Graph Data]. |
| . Steps that are concerned with the global ordering of traversers do not have a meaningful representation in |
| OLAP. For example, what does <<order-step,`order()`>>-step mean when all traversers are being processed in parallel? |
| Even if the traversers were aggregated and ordered, then at the next step they would return to being executed in |
| parallel and thus, in an unpredictable order. When `order()`-like steps are executed at the end of a traversal (i.e |
| the final step), `TraversalVertexProgram` ensures a serial representation is ordered accordingly. Moreover, it is intelligent enough |
| to maintain the ordering of `g.V().hasLabel("person").order().by("age").values("name")`. However, the OLAP traversal |
| `g.V().hasLabel("person").order().by("age").out().values("name")` will lose the original ordering as the `out()`-step |
| will rebroadcast traversers across the cluster. |
| |
| [[graph-filter]] |
| == Graph Filter |
| |
| Most OLAP jobs do not require the entire source graph to faithfully execute their `VertexProgram`. For instance, if |
| `PageRankVertexProgram` is only going to compute the centrality of people in the friendship-graph, then the following |
| `GraphFilter` can be applied. |
| |
| [source,java] |
| ---- |
| graph.computer(). |
| vertices(hasLabel("person")). |
| edges(bothE("knows")). |
| program(PageRankVertexProgram...) |
| ---- |
| |
| There are two methods for constructing a `GraphFilter`. |
| |
| * `vertices(Traversal<Vertex,Vertex>)`: A traversal that will be used that can only analyze a vertex and its properties. |
| If the traversal `hasNext()`, the input `Vertex` is passed to the `GraphComputer`. |
| * `edges(Traversal<Vertex,Edge>)`: A traversal that will iterate all legal edges for the source vertex. |
| |
| `GraphFilter` is a "push-down predicate" that providers can reason on to determine the most efficient way to provide |
| graph data to the `GraphComputer`. |
| |
| IMPORTANT: Apache TinkerPop provides `GraphFilterStrategy` <<traversalstrategy,traversal strategy>> which analyzes a submitted |
| OLAP traversal and, if possible, creates an appropriate `GraphFilter` automatically. For instance, `g.V().count()` would |
| yield a `GraphFilter.edges(limit(0))`. Thus, for traversal submissions, users typically do not need to be aware of creating |
| graph filters explicitly. Users can use the <<explain-step,`explain()`>>-step to see the `GraphFilter` generated by `GraphFilterStrategy`. |