blob: 1949c5349f3a66adda1817230730533168991eb4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.process.computer.search.path;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ProgramVertexProgramStep;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.VertexProgramStep;
import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.process.traversal.Path;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ImmutablePath;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.IndexedTraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
import org.apache.tinkerpop.gremlin.util.NumberHelper;
import org.javatuples.Pair;
import org.javatuples.Triplet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
/**
* @author Daniel Kuppitz (http://gremlin.guru)
*/
public class ShortestPathVertexProgram implements VertexProgram<Triplet<Path, Edge, Number>> {
@SuppressWarnings("WeakerAccess")
public static final String SHORTEST_PATHS = "gremlin.shortestPathVertexProgram.shortestPaths";
private static final String SOURCE_VERTEX_FILTER = "gremlin.shortestPathVertexProgram.sourceVertexFilter";
private static final String TARGET_VERTEX_FILTER = "gremlin.shortestPathVertexProgram.targetVertexFilter";
private static final String EDGE_TRAVERSAL = "gremlin.shortestPathVertexProgram.edgeTraversal";
private static final String DISTANCE_TRAVERSAL = "gremlin.shortestPathVertexProgram.distanceTraversal";
private static final String MAX_DISTANCE = "gremlin.shortestPathVertexProgram.maxDistance";
private static final String INCLUDE_EDGES = "gremlin.shortestPathVertexProgram.includeEdges";
private static final String STATE = "gremlin.shortestPathVertexProgram.state";
private static final String PATHS = "gremlin.shortestPathVertexProgram.paths";
private static final String VOTE_TO_HALT = "gremlin.shortestPathVertexProgram.voteToHalt";
private static final int SEARCH = 0;
private static final int COLLECT_PATHS = 1;
private static final int UPDATE_HALTED_TRAVERSERS = 2;
public static final PureTraversal<Vertex, ?> DEFAULT_VERTEX_FILTER_TRAVERSAL = new PureTraversal<>(
__.<Vertex> identity().asAdmin()); // todo: new IdentityTraversal<>()
public static final PureTraversal<Vertex, Edge> DEFAULT_EDGE_TRAVERSAL = new PureTraversal<>(__.bothE().asAdmin());
public static final PureTraversal<Edge, Number> DEFAULT_DISTANCE_TRAVERSAL = new PureTraversal<>(
__.<Edge> start().<Number> constant(1).asAdmin()); // todo: new ConstantTraversal<>(1)
private TraverserSet<Vertex> haltedTraversers;
private IndexedTraverserSet<Vertex, Vertex> haltedTraversersIndex;
private PureTraversal<?, ?> traversal;
private PureTraversal<Vertex, ?> sourceVertexFilterTraversal = DEFAULT_VERTEX_FILTER_TRAVERSAL.clone();
private PureTraversal<Vertex, ?> targetVertexFilterTraversal = DEFAULT_VERTEX_FILTER_TRAVERSAL.clone();
private PureTraversal<Vertex, Edge> edgeTraversal = DEFAULT_EDGE_TRAVERSAL.clone();
private PureTraversal<Edge, Number> distanceTraversal = DEFAULT_DISTANCE_TRAVERSAL.clone();
private Step<Vertex, Path> programStep;
private Number maxDistance;
private boolean distanceEqualsNumberOfHops;
private boolean includeEdges;
private boolean standalone;
private static final Set<VertexComputeKey> VERTEX_COMPUTE_KEYS = new HashSet<>(Arrays.asList(
VertexComputeKey.of(PATHS, true),
VertexComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, false)));
private final Set<MemoryComputeKey> memoryComputeKeys = new HashSet<>(Arrays.asList(
MemoryComputeKey.of(VOTE_TO_HALT, Operator.and, false, true),
MemoryComputeKey.of(STATE, Operator.assign, true, true)));
private ShortestPathVertexProgram() {
}
@Override
public void loadState(final Graph graph, final Configuration configuration) {
if (configuration.containsKey(SOURCE_VERTEX_FILTER))
this.sourceVertexFilterTraversal = PureTraversal.loadState(configuration, SOURCE_VERTEX_FILTER, graph);
if (configuration.containsKey(TARGET_VERTEX_FILTER))
this.targetVertexFilterTraversal = PureTraversal.loadState(configuration, TARGET_VERTEX_FILTER, graph);
if (configuration.containsKey(EDGE_TRAVERSAL))
this.edgeTraversal = PureTraversal.loadState(configuration, EDGE_TRAVERSAL, graph);
if (configuration.containsKey(DISTANCE_TRAVERSAL))
this.distanceTraversal = PureTraversal.loadState(configuration, DISTANCE_TRAVERSAL, graph);
if (configuration.containsKey(MAX_DISTANCE))
this.maxDistance = (Number) configuration.getProperty(MAX_DISTANCE);
this.distanceEqualsNumberOfHops = this.distanceTraversal.equals(DEFAULT_DISTANCE_TRAVERSAL);
this.includeEdges = configuration.getBoolean(INCLUDE_EDGES, false);
this.standalone = !configuration.containsKey(VertexProgramStep.ROOT_TRAVERSAL);
if (!this.standalone) {
this.traversal = PureTraversal.loadState(configuration, VertexProgramStep.ROOT_TRAVERSAL, graph);
final String programStepId = configuration.getString(ProgramVertexProgramStep.STEP_ID);
for (final Step step : this.traversal.get().getSteps()) {
if (step.getId().equals(programStepId)) {
//noinspection unchecked
this.programStep = step;
break;
}
}
}
// restore halted traversers from the configuration and build an index for direct access
this.haltedTraversers = TraversalVertexProgram.loadHaltedTraversers(configuration);
this.haltedTraversersIndex = new IndexedTraverserSet<>(v -> v);
for (final Traverser.Admin<Vertex> traverser : this.haltedTraversers) {
this.haltedTraversersIndex.add(traverser.split());
}
this.memoryComputeKeys.add(MemoryComputeKey.of(SHORTEST_PATHS, Operator.addAll, true, !standalone));
}
@Override
public void storeState(final Configuration configuration) {
VertexProgram.super.storeState(configuration);
this.sourceVertexFilterTraversal.storeState(configuration, SOURCE_VERTEX_FILTER);
this.targetVertexFilterTraversal.storeState(configuration, TARGET_VERTEX_FILTER);
this.edgeTraversal.storeState(configuration, EDGE_TRAVERSAL);
this.distanceTraversal.storeState(configuration, DISTANCE_TRAVERSAL);
configuration.setProperty(INCLUDE_EDGES, this.includeEdges);
if (this.maxDistance != null)
configuration.setProperty(MAX_DISTANCE, maxDistance);
if (this.traversal != null) {
this.traversal.storeState(configuration, ProgramVertexProgramStep.ROOT_TRAVERSAL);
configuration.setProperty(ProgramVertexProgramStep.STEP_ID, this.programStep.getId());
}
TraversalVertexProgram.storeHaltedTraversers(configuration, this.haltedTraversers);
}
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
return VERTEX_COMPUTE_KEYS;
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
return memoryComputeKeys;
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return Collections.emptySet();
}
@Override
public VertexProgram<Triplet<Path, Edge, Number>> clone() {
try {
final ShortestPathVertexProgram clone = (ShortestPathVertexProgram) super.clone();
if (null != this.edgeTraversal)
clone.edgeTraversal = this.edgeTraversal.clone();
if (null != this.sourceVertexFilterTraversal)
clone.sourceVertexFilterTraversal = this.sourceVertexFilterTraversal.clone();
if (null != this.targetVertexFilterTraversal)
clone.targetVertexFilterTraversal = this.targetVertexFilterTraversal.clone();
if (null != this.distanceTraversal)
clone.distanceTraversal = this.distanceTraversal.clone();
if (null != this.traversal) {
clone.traversal = this.traversal.clone();
for (final Step step : clone.traversal.get().getSteps()) {
if (step.getId().equals(this.programStep.getId())) {
//noinspection unchecked
clone.programStep = step;
break;
}
}
}
return clone;
} catch (final CloneNotSupportedException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.ORIGINAL;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
@Override
public void setup(final Memory memory) {
memory.set(VOTE_TO_HALT, true);
memory.set(STATE, SEARCH);
}
@Override
public void execute(final Vertex vertex, final Messenger<Triplet<Path, Edge, Number>> messenger, final Memory memory) {
switch (memory.<Integer>get(STATE)) {
case COLLECT_PATHS:
collectShortestPaths(vertex, memory);
return;
case UPDATE_HALTED_TRAVERSERS:
updateHaltedTraversers(vertex, memory);
return;
}
boolean voteToHalt = true;
if (memory.isInitialIteration()) {
// Use the first iteration to copy halted traversers from the halted traverser index to the respective
// vertices. This way the rest of the code can be simplified and always expect the HALTED_TRAVERSERS
// property to be available (if halted traversers exist for this vertex).
copyHaltedTraversersFromMemory(vertex);
// ignore vertices that don't pass the start-vertex filter
if (!isStartVertex(vertex)) return;
// start to track paths for all valid start-vertices
final Map<Vertex, Pair<Number, Set<Path>>> paths = new HashMap<>();
final Path path;
final Set<Path> pathSet = new HashSet<>();
pathSet.add(path = makePath(vertex));
paths.put(vertex, Pair.with(0, pathSet));
vertex.property(VertexProperty.Cardinality.single, PATHS, paths);
// send messages to valid adjacent vertices
processEdges(vertex, path, 0, messenger);
voteToHalt = false;
} else {
// load existing paths to this vertex and extend them based on messages received from adjacent vertices
final Map<Vertex, Pair<Number, Set<Path>>> paths =
vertex.<Map<Vertex, Pair<Number, Set<Path>>>>property(PATHS).orElseGet(HashMap::new);
final Iterator<Triplet<Path, Edge, Number>> iterator = messenger.receiveMessages();
while (iterator.hasNext()) {
final Triplet<Path, Edge, Number> triplet = iterator.next();
final Path sourcePath = triplet.getValue0();
final Number distance = triplet.getValue2();
final Vertex sourceVertex = sourcePath.get(0);
Path newPath = null;
// already know a path coming from this source vertex?
if (paths.containsKey(sourceVertex)) {
final Number currentShortestDistance = paths.get(sourceVertex).getValue0();
final int cmp = NumberHelper.compare(distance, currentShortestDistance);
if (cmp <= 0) {
newPath = extendPath(sourcePath, triplet.getValue1(), vertex);
if (cmp < 0) {
// if the path length is smaller than the current shortest path's length, replace the
// current set of shortest paths
final Set<Path> pathSet = new HashSet<>();
pathSet.add(newPath);
paths.put(sourceVertex, Pair.with(distance, pathSet));
} else {
// if the path length is equal to the current shortest path's length, add the new path
// to the set of shortest paths
paths.get(sourceVertex).getValue1().add(newPath);
}
}
} else if (!exceedsMaxDistance(distance)) {
// store the new path as the shortest path from the source vertex to the current vertex
final Set<Path> pathSet = new HashSet<>();
pathSet.add(newPath = extendPath(sourcePath, triplet.getValue1(), vertex));
paths.put(sourceVertex, Pair.with(distance, pathSet));
}
// if a new path was found, send messages to adjacent vertices, otherwise do nothing as there's no
// chance to find any new paths going forward
if (newPath != null) {
vertex.property(VertexProperty.Cardinality.single, PATHS, paths);
processEdges(vertex, newPath, distance, messenger);
voteToHalt = false;
}
}
}
// VOTE_TO_HALT will be set to true if an iteration hasn't found any new paths
memory.add(VOTE_TO_HALT, voteToHalt);
}
@Override
public boolean terminate(final Memory memory) {
if (memory.isInitialIteration() && this.haltedTraversersIndex != null) {
this.haltedTraversersIndex.clear();
}
final boolean voteToHalt = memory.get(VOTE_TO_HALT);
if (voteToHalt) {
final int state = memory.get(STATE);
if (state == COLLECT_PATHS) {
// After paths were collected,
// a) the VP is done in standalone mode (paths will be in memory) or
// b) the halted traversers will be updated in order to have the paths available in the traversal
if (this.standalone) return true;
memory.set(STATE, UPDATE_HALTED_TRAVERSERS);
return false;
}
if (state == UPDATE_HALTED_TRAVERSERS) return true;
else memory.set(STATE, COLLECT_PATHS); // collect paths if no new paths were found
return false;
} else {
memory.set(VOTE_TO_HALT, true);
return false;
}
}
@Override
public String toString() {
final List<String> options = new ArrayList<>();
final Function<String, String> shortName = name -> name.substring(name.lastIndexOf(".") + 1);
if (!this.sourceVertexFilterTraversal.equals(DEFAULT_VERTEX_FILTER_TRAVERSAL)) {
options.add(shortName.apply(SOURCE_VERTEX_FILTER) + "=" + this.sourceVertexFilterTraversal.get());
}
if (!this.targetVertexFilterTraversal.equals(DEFAULT_VERTEX_FILTER_TRAVERSAL)) {
options.add(shortName.apply(TARGET_VERTEX_FILTER) + "=" + this.targetVertexFilterTraversal.get());
}
if (!this.edgeTraversal.equals(DEFAULT_EDGE_TRAVERSAL)) {
options.add(shortName.apply(EDGE_TRAVERSAL) + "=" + this.edgeTraversal.get());
}
if (!this.distanceTraversal.equals(DEFAULT_DISTANCE_TRAVERSAL)) {
options.add(shortName.apply(DISTANCE_TRAVERSAL) + "=" + this.distanceTraversal.get());
}
options.add(shortName.apply(INCLUDE_EDGES) + "=" + this.includeEdges);
return StringFactory.vertexProgramString(this, String.join(", ", options));
}
//////////////////////////////
private void copyHaltedTraversersFromMemory(final Vertex vertex) {
final Collection<Traverser.Admin<Vertex>> traversers = this.haltedTraversersIndex.get(vertex);
if (traversers != null) {
final TraverserSet<Vertex> newHaltedTraversers = new TraverserSet<>();
newHaltedTraversers.addAll(traversers);
vertex.property(VertexProperty.Cardinality.single, TraversalVertexProgram.HALTED_TRAVERSERS, newHaltedTraversers);
}
}
private static Path makePath(final Vertex newVertex) {
return extendPath(null, newVertex);
}
private static Path extendPath(final Path currentPath, final Element... elements) {
Path result = ImmutablePath.make();
if (currentPath != null) {
for (final Object o : currentPath.objects()) {
result = result.extend(o, Collections.emptySet());
}
}
for (final Element element : elements) {
if (element != null) {
result = result.extend(ReferenceFactory.detach(element), Collections.emptySet());
}
}
return result;
}
private boolean isStartVertex(final Vertex vertex) {
// use the sourceVertexFilterTraversal if the VP is running in standalone mode (not part of a traversal)
if (this.standalone) {
final Traversal.Admin<Vertex, ?> filterTraversal = this.sourceVertexFilterTraversal.getPure();
filterTraversal.addStart(filterTraversal.getTraverserGenerator().generate(vertex, filterTraversal.getStartStep(), 1));
return filterTraversal.hasNext();
}
// ...otherwise use halted traversers to determine whether this is a start vertex
return vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent();
}
private boolean isEndVertex(final Vertex vertex) {
final Traversal.Admin<Vertex, ?> filterTraversal = this.targetVertexFilterTraversal.getPure();
//noinspection unchecked
final Step<Vertex, Vertex> startStep = (Step<Vertex, Vertex>) filterTraversal.getStartStep();
filterTraversal.addStart(filterTraversal.getTraverserGenerator().generate(vertex, startStep, 1));
return filterTraversal.hasNext();
}
private void processEdges(final Vertex vertex, final Path currentPath, final Number currentDistance,
final Messenger<Triplet<Path, Edge, Number>> messenger) {
final Traversal.Admin<Vertex, Edge> edgeTraversal = this.edgeTraversal.getPure();
edgeTraversal.addStart(edgeTraversal.getTraverserGenerator().generate(vertex, edgeTraversal.getStartStep(), 1));
while (edgeTraversal.hasNext()) {
final Edge edge = edgeTraversal.next();
final Number distance = getDistance(edge);
Vertex otherV = edge.inVertex();
if (otherV.equals(vertex))
otherV = edge.outVertex();
// only send message if the adjacent vertex is not yet part of the current path
if (!currentPath.objects().contains(otherV)) {
messenger.sendMessage(MessageScope.Global.of(otherV),
Triplet.with(currentPath, this.includeEdges ? edge : null,
NumberHelper.add(currentDistance, distance)));
}
}
}
private void updateHaltedTraversers(final Vertex vertex, final Memory memory) {
if (isStartVertex(vertex)) {
final List<Path> paths = memory.get(SHORTEST_PATHS);
if (vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent()) {
// replace the current set of halted traversers with new new traversers that hold the shortest paths
// found for this vertex
final TraverserSet<Vertex> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
final TraverserSet<Path> newHaltedTraversers = new TraverserSet<>();
for (final Traverser.Admin<Vertex> traverser : haltedTraversers) {
final Vertex v = traverser.get();
for (final Path path : paths) {
if (path.get(0).equals(v)) {
newHaltedTraversers.add(traverser.split(path, this.programStep));
}
}
}
vertex.property(VertexProperty.Cardinality.single, TraversalVertexProgram.HALTED_TRAVERSERS, newHaltedTraversers);
}
}
}
private Number getDistance(final Edge edge) {
if (this.distanceEqualsNumberOfHops) return 1;
final Traversal.Admin<Edge, Number> traversal = this.distanceTraversal.getPure();
traversal.addStart(traversal.getTraverserGenerator().generate(edge, traversal.getStartStep(), 1));
return traversal.tryNext().orElse(0);
}
private boolean exceedsMaxDistance(final Number distance) {
// This method is used to stop the message sending for paths that exceed the specified maximum distance. Since
// custom distances can be negative, this method should only return true if the distance is calculated based on
// the number of hops.
return this.distanceEqualsNumberOfHops && this.maxDistance != null
&& NumberHelper.compare(distance, this.maxDistance) > 0;
}
/**
* Move any valid path into the VP's memory.
* @param vertex The current vertex.
* @param memory The VertexProgram's memory.
*/
private void collectShortestPaths(final Vertex vertex, final Memory memory) {
final VertexProperty<Map<Vertex, Pair<Number, Set<Path>>>> pathProperty = vertex.property(PATHS);
if (pathProperty.isPresent()) {
final Map<Vertex, Pair<Number, Set<Path>>> paths = pathProperty.value();
final List<Path> result = new ArrayList<>();
for (final Pair<Number, Set<Path>> pair : paths.values()) {
for (final Path path : pair.getValue1()) {
if (isEndVertex(vertex)) {
if (this.distanceEqualsNumberOfHops ||
this.maxDistance == null ||
NumberHelper.compare(pair.getValue0(), this.maxDistance) <= 0) {
result.add(path);
}
}
}
}
pathProperty.remove();
memory.add(SHORTEST_PATHS, result);
}
}
//////////////////////////////
public static Builder build() {
return new Builder();
}
@SuppressWarnings("WeakerAccess")
public static final class Builder extends AbstractVertexProgramBuilder<Builder> {
private Builder() {
super(ShortestPathVertexProgram.class);
}
public Builder source(final Traversal<Vertex, ?> sourceVertexFilter) {
if (null == sourceVertexFilter) throw Graph.Exceptions.argumentCanNotBeNull("sourceVertexFilter");
PureTraversal.storeState(this.configuration, SOURCE_VERTEX_FILTER, sourceVertexFilter.asAdmin());
return this;
}
public Builder target(final Traversal<Vertex, ?> targetVertexFilter) {
if (null == targetVertexFilter) throw Graph.Exceptions.argumentCanNotBeNull("targetVertexFilter");
PureTraversal.storeState(this.configuration, TARGET_VERTEX_FILTER, targetVertexFilter.asAdmin());
return this;
}
public Builder edgeDirection(final Direction direction) {
if (null == direction) throw Graph.Exceptions.argumentCanNotBeNull("direction");
return edgeTraversal(__.toE(direction));
}
public Builder edgeTraversal(final Traversal<Vertex, Edge> edgeTraversal) {
if (null == edgeTraversal) throw Graph.Exceptions.argumentCanNotBeNull("edgeTraversal");
PureTraversal.storeState(this.configuration, EDGE_TRAVERSAL, edgeTraversal.asAdmin());
return this;
}
public Builder distanceProperty(final String distance) {
//noinspection unchecked
return distance != null
? distanceTraversal(__.values(distance)) // todo: (Traversal) new ElementValueTraversal<>(distance)
: distanceTraversal(DEFAULT_DISTANCE_TRAVERSAL.getPure());
}
public Builder distanceTraversal(final Traversal<Edge, Number> distanceTraversal) {
if (null == distanceTraversal) throw Graph.Exceptions.argumentCanNotBeNull("distanceTraversal");
PureTraversal.storeState(this.configuration, DISTANCE_TRAVERSAL, distanceTraversal.asAdmin());
return this;
}
public Builder maxDistance(final Number distance) {
if (null != distance)
this.configuration.setProperty(MAX_DISTANCE, distance);
else
this.configuration.clearProperty(MAX_DISTANCE);
return this;
}
public Builder includeEdges(final boolean include) {
this.configuration.setProperty(INCLUDE_EDGES, include);
return this;
}
}
////////////////////////////
@Override
public Features getFeatures() {
return new Features() {
@Override
public boolean requiresGlobalMessageScopes() {
return true;
}
@Override
public boolean requiresVertexPropertyAddition() {
return true;
}
};
}
}