blob: fe01ea2dfa88dab39e1e9c7d99a358128d52cf8c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.process.computer.traversal;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.sideEffect.mapreduce.TraverserMapReduce;
import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
import org.apache.tinkerpop.gremlin.process.computer.util.ConfigurationTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.TraverserGenerator;
import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalClassFunction;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalObjectFunction;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalScriptFunction;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.VertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
/**
* TraversalVertexProgram enables the evaluation of a {@link Traversal} on a {@link org.apache.tinkerpop.gremlin.process.computer.GraphComputer}.
* At the start of the computation, each {@link Vertex} (or {@link org.apache.tinkerpop.gremlin.structure.Edge}) is assigned a single {@link Traverser}.
* For each traverser that is local to the vertex, the vertex looks up its current location in the traversal and processes that step.
* If the outputted traverser of the step references a local structure on the vertex (e.g. the vertex, an incident edge, its properties, or an arbitrary object),
* then the vertex continues to compute the next traverser. If the traverser references another location in the graph,
* then the traverser is sent to that location in the graph via a message. The messages of TraversalVertexProgram are traversers.
* This continues until all traversers in the computation have halted.
*
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class TraversalVertexProgram implements VertexProgram<TraverserSet<?>> {
public static final String HALTED_TRAVERSERS = "gremlin.traversalVertexProgram.haltedTraversers";
private static final String VOTE_TO_HALT = "gremlin.traversalVertexProgram.voteToHalt";
public static final String TRAVERSAL_SUPPLIER = "gremlin.traversalVertexProgram.traversalSupplier";
// TODO: if not an adjacent traversal, use Local message scope -- a dual messaging system.
private static final Set<MessageScope> MESSAGE_SCOPES = new HashSet<>(Collections.singletonList(MessageScope.Global.instance()));
private static final Set<String> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(HALTED_TRAVERSERS, TraversalSideEffects.SIDE_EFFECTS));
private static final Set<String> MEMORY_COMPUTE_KEYS = new HashSet<>(Collections.singletonList(VOTE_TO_HALT));
private ConfigurationTraversal<?, ?> configurationTraversal;
private Traversal.Admin<?, ?> traversal;
private TraversalMatrix<?, ?> traversalMatrix;
private final Set<MapReduce> mapReducers = new HashSet<>();
private TraversalVertexProgram() {
}
/**
* A helper method to yield a {@link Traversal} from the {@link Graph} and provided {@link Configuration}.
*
* @param graph the graph that the traversal will run against
* @param configuration The configuration containing the TRAVERSAL_SUPPLIER key.
* @return the traversal supplied by the configuration
*/
public static Traversal.Admin<?, ?> getTraversal(final Graph graph, final Configuration configuration) {
return VertexProgram.<TraversalVertexProgram>createVertexProgram(graph, configuration).getTraversal();
}
public Traversal.Admin<?, ?> getTraversal() {
return this.traversal;
}
@Override
public void loadState(final Graph graph, final Configuration configuration) {
this.configurationTraversal = ConfigurationTraversal.loadState(graph, configuration, TRAVERSAL_SUPPLIER);
if (null == this.configurationTraversal) {
throw new IllegalArgumentException("The configuration does not have a traversal supplier:" + TRAVERSAL_SUPPLIER);
}
this.traversal = this.configurationTraversal.get();
((ComputerResultStep) this.traversal.getEndStep()).setBypass(true);
this.traversalMatrix = new TraversalMatrix<>(this.traversal);
for (final MapReducer<?, ?, ?, ?, ?> mapReducer : TraversalHelper.getStepsOfAssignableClassRecursively(MapReducer.class, this.traversal)) {
this.mapReducers.add(mapReducer.getMapReduce());
}
if (!(this.traversal.getEndStep().getPreviousStep() instanceof SideEffectCapStep) && !(this.traversal.getEndStep().getPreviousStep() instanceof ReducingBarrierStep))
this.mapReducers.add(new TraverserMapReduce(this.traversal));
}
@Override
public void storeState(final Configuration configuration) {
VertexProgram.super.storeState(configuration);
this.configurationTraversal.storeState(configuration);
}
@Override
public void setup(final Memory memory) {
memory.set(VOTE_TO_HALT, true);
}
@Override
public Set<MessageScope> getMessageScopes(final Memory memory) {
return MESSAGE_SCOPES;
}
@Override
public void execute(final Vertex vertex, final Messenger<TraverserSet<?>> messenger, final Memory memory) {
this.traversal.getSideEffects().setLocalVertex(vertex);
if (memory.isInitialIteration()) { // ITERATION 1
final TraverserSet<Object> haltedTraversers = new TraverserSet<>();
vertex.property(VertexProperty.Cardinality.single, HALTED_TRAVERSERS, haltedTraversers);
if (!(this.traversal.getStartStep() instanceof GraphStep))
throw new UnsupportedOperationException("TraversalVertexProgram currently only supports GraphStep starts on vertices or edges");
final GraphStep<Element, Element> graphStep = (GraphStep<Element, Element>) this.traversal.getStartStep();
final String future = graphStep.getNextStep().getId();
final TraverserGenerator traverserGenerator = this.traversal.getTraverserGenerator();
if (graphStep.returnsVertex()) { // VERTICES (process the first step locally)
if (ElementHelper.idExists(vertex.id(), graphStep.getIds())) {
final Traverser.Admin<Element> traverser = traverserGenerator.generate(vertex, graphStep, 1l);
traverser.setStepId(future);
traverser.detach();
if (traverser.isHalted())
haltedTraversers.add((Traverser.Admin) traverser);
else
memory.and(VOTE_TO_HALT, TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, new TraverserSet<>(traverser)), this.traversalMatrix));
}
} else { // EDGES (process the first step via a message pass)
boolean voteToHalt = true;
final Iterator<Edge> starts = vertex.edges(Direction.OUT);
while (starts.hasNext()) {
final Edge start = starts.next();
if (ElementHelper.idExists(start.id(), graphStep.getIds())) {
final Traverser.Admin<Element> traverser = traverserGenerator.generate(start, graphStep, 1l);
traverser.setStepId(future);
traverser.detach();
if (traverser.isHalted())
haltedTraversers.add((Traverser.Admin) traverser);
else {
voteToHalt = false;
messenger.sendMessage(MessageScope.Global.of(vertex), new TraverserSet<>(traverser));
}
}
}
memory.and(VOTE_TO_HALT, voteToHalt);
}
} else { // ITERATION 1+
memory.and(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix));
}
}
@Override
public boolean terminate(final Memory memory) {
final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
if (voteToHalt) {
return true;
} else {
memory.set(VOTE_TO_HALT, true);
return false;
}
}
@Override
public Set<String> getElementComputeKeys() {
return ELEMENT_COMPUTE_KEYS;
}
@Override
public Set<String> getMemoryComputeKeys() {
return MEMORY_COMPUTE_KEYS;
}
@Override
public Set<MapReduce> getMapReducers() {
return this.mapReducers;
}
@Override
public Optional<MessageCombiner<TraverserSet<?>>> getMessageCombiner() {
return (Optional) TraversalVertexProgramMessageCombiner.instance();
}
@Override
public TraversalVertexProgram clone() {
try {
final TraversalVertexProgram clone = (TraversalVertexProgram) super.clone();
clone.traversal = this.traversal.clone();
clone.traversalMatrix = new TraversalMatrix<>(clone.traversal);
return clone;
} catch (final CloneNotSupportedException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
public GraphComputer.ResultGraph getPreferredResultGraph() {
return GraphComputer.ResultGraph.ORIGINAL;
}
@Override
public GraphComputer.Persist getPreferredPersist() {
return GraphComputer.Persist.NOTHING;
}
@Override
public String toString() {
final String traversalString = this.traversal.toString().substring(1);
return StringFactory.vertexProgramString(this, traversalString.substring(0, traversalString.length() - 1));
}
@Override
public Features getFeatures() {
return new Features() {
@Override
public boolean requiresGlobalMessageScopes() {
return true;
}
@Override
public boolean requiresVertexPropertyAddition() {
return true;
}
};
}
public <S, E> Traversal.Admin<S, E> computerResultTraversal(final ComputerResult result) {
final Traversal.Admin<S, E> traversal = (Traversal.Admin<S, E>) this.getTraversal();
((ComputerResultStep) traversal.getEndStep()).populateTraversers(result);
return traversal;
}
//////////////
public static Builder build() {
return new Builder();
}
public final static class Builder extends AbstractVertexProgramBuilder<Builder> {
private Builder() {
super(TraversalVertexProgram.class);
}
public Builder traversal(final TraversalSource.Builder builder, final String scriptEngine, final String traversalScript, final Object... bindings) {
ConfigurationTraversal.storeState(new TraversalScriptFunction<>(builder, scriptEngine, traversalScript, bindings), this.configuration, TRAVERSAL_SUPPLIER);
return this;
}
public Builder traversal(final Traversal.Admin<?, ?> traversal) {
ConfigurationTraversal.storeState(new TraversalObjectFunction<>(traversal), this.configuration, TRAVERSAL_SUPPLIER);
return this;
}
public Builder traversal(final Class<? extends Supplier<Traversal.Admin<?, ?>>> traversalClass) {
ConfigurationTraversal.storeState(new TraversalClassFunction(traversalClass), this.configuration, TRAVERSAL_SUPPLIER);
return this;
}
// TODO Builder resolveElements(boolean) to be fed to ComputerResultStep
}
}