blob: b437a9dbb0fdccc69365708dd74e646c6578e432 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.process.computer.traversal;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.HaltedTraverserStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.IndexedTraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Property;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.Host;
import org.apache.tinkerpop.gremlin.structure.util.reference.ReferenceFactory;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
final class WorkerExecutor {
private WorkerExecutor() {
}
protected static boolean execute(final Vertex vertex,
final Messenger<TraverserSet<Object>> messenger,
final TraversalMatrix<?, ?> traversalMatrix,
final Memory memory,
final boolean returnHaltedTraversers,
final TraverserSet<Object> haltedTraversers,
final HaltedTraverserStrategy haltedTraverserStrategy) {
final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
final AtomicBoolean voteToHalt = new AtomicBoolean(true);
final TraverserSet<Object> activeTraversers = new TraverserSet<>();
final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
////////////////////////////////
// GENERATE LOCAL TRAVERSERS //
///////////////////////////////
// MASTER ACTIVE
// these are traversers that are going from OLTP (master) to OLAP (workers)
// these traversers were broadcasted from the master traversal to the workers for attachment
final IndexedTraverserSet<Object,Vertex> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
// some memory systems are interacted with by multiple threads and thus, concurrent modification can happen at iterator.remove().
// its better to reduce the memory footprint and shorten the active traverser list so synchronization is worth it.
// most distributed OLAP systems have the memory partitioned and thus, this synchronization does nothing.
synchronized (maybeActiveTraversers) {
if (!maybeActiveTraversers.isEmpty()) {
final Collection<Traverser.Admin<Object>> traversers = maybeActiveTraversers.get(vertex);
if (traversers != null) {
final Iterator<Traverser.Admin<Object>> iterator = traversers.iterator();
while (iterator.hasNext()) {
final Traverser.Admin<Object> traverser = iterator.next();
iterator.remove();
maybeActiveTraversers.remove(traverser);
traverser.attach(Attachable.Method.get(vertex));
traverser.setSideEffects(traversalSideEffects);
toProcessTraversers.add(traverser);
}
}
}
}
// WORKER ACTIVE
// these are traversers that exist from from a local barrier
// these traversers will simply saved at the local vertex while the master traversal synchronized the barrier
vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).ifPresent(previousActiveTraversers -> {
IteratorUtils.removeOnNext(previousActiveTraversers.iterator()).forEachRemaining(traverser -> {
traverser.attach(Attachable.Method.get(vertex));
traverser.setSideEffects(traversalSideEffects);
toProcessTraversers.add(traverser);
});
assert previousActiveTraversers.isEmpty();
// remove the property to save space
vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
});
// TRAVERSER MESSAGES (WORKER -> WORKER)
// these are traversers that have been messaged to the vertex from another vertex
final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
while (messages.hasNext()) {
IteratorUtils.removeOnNext(messages.next().iterator()).forEachRemaining(traverser -> {
if (traverser.isHalted()) {
if (returnHaltedTraversers)
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
else
haltedTraversers.add(traverser); // the traverser has already been detached so no need to detach it again
} else {
// traverser is not halted and thus, should be processed locally
// attach it and process
traverser.attach(Attachable.Method.get(vertex));
traverser.setSideEffects(traversalSideEffects);
toProcessTraversers.add(traverser);
}
});
}
///////////////////////////////
// PROCESS LOCAL TRAVERSERS //
//////////////////////////////
// while there are still local traversers, process them until they leave the vertex (message pass) or halt (store).
while (!toProcessTraversers.isEmpty()) {
Step<Object, Object> previousStep = EmptyStep.instance();
Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
while (traversers.hasNext()) {
final Traverser.Admin<Object> traverser = traversers.next();
traversers.remove();
final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId());
// try and fill up the current step as much as possible with traversers to get a bulking optimization
if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep))
WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserStrategy);
currentStep.addStart(traverser);
previousStep = currentStep;
}
WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers, haltedTraverserStrategy);
// all processed traversers should be either halted or active
assert toProcessTraversers.isEmpty();
// process all the local objects and send messages or store locally again
if (!activeTraversers.isEmpty()) {
traversers = activeTraversers.iterator();
while (traversers.hasNext()) {
final Traverser.Admin<Object> traverser = traversers.next();
traversers.remove();
// decide whether to message the traverser or to process it locally
if (traverser.get() instanceof Element || traverser.get() instanceof Property) { // GRAPH OBJECT
// if the element is remote, then message, else store it locally for re-processing
final Vertex hostingVertex = Host.getHostingVertex(traverser.get());
if (!vertex.equals(hostingVertex)) { // if its host is not the current vertex, then send the traverser to the hosting vertex
voteToHalt.set(false); // if message is passed, then don't vote to halt
messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser.detach()));
} else {
traverser.attach(Attachable.Method.get(vertex)); // necessary for select() steps that reference the current object
toProcessTraversers.add(traverser);
}
} else // STANDARD OBJECT
toProcessTraversers.add(traverser);
}
assert activeTraversers.isEmpty();
}
}
return voteToHalt.get();
}
private static void drainStep(final Vertex vertex,
final Step<Object, Object> step,
final TraverserSet<Object> activeTraversers,
final TraverserSet<Object> haltedTraversers,
final Memory memory,
final boolean returnHaltedTraversers,
final HaltedTraverserStrategy haltedTraverserStrategy) {
GraphComputing.atMaster(step, false);
if (step instanceof Barrier) {
if (step instanceof Bypassing)
((Bypassing) step).setBypass(true);
if (step instanceof LocalBarrier) {
// local barrier traversers are stored on the vertex until the master traversal synchronizes the system
final LocalBarrier<Object> barrier = (LocalBarrier<Object>) step;
final TraverserSet<Object> localBarrierTraversers = vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).orElse(new TraverserSet<>());
vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS, localBarrierTraversers);
while (barrier.hasNextBarrier()) {
final TraverserSet<Object> barrierSet = barrier.nextBarrier();
IteratorUtils.removeOnNext(barrierSet.iterator()).forEachRemaining(traverser -> {
traverser.addLabels(step.getLabels()); // this might need to be generalized for working with global barriers too
if (traverser.isHalted() &&
(returnHaltedTraversers ||
(!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
Host.getHostingVertex(traverser.get()).equals(vertex))) {
if (returnHaltedTraversers)
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
else
haltedTraversers.add(traverser.detach());
} else
localBarrierTraversers.add(traverser.detach());
});
}
memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
} else {
final Barrier barrier = (Barrier) step;
while (barrier.hasNextBarrier()) {
memory.add(step.getId(), barrier.nextBarrier());
}
memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
}
} else { // LOCAL PROCESSING
step.forEachRemaining(traverser -> {
if (traverser.isHalted() &&
// if its a ReferenceFactory (one less iteration required)
((returnHaltedTraversers || ReferenceFactory.class == haltedTraverserStrategy.getHaltedTraverserFactory()) &&
(!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
Host.getHostingVertex(traverser.get()).equals(vertex))) {
if (returnHaltedTraversers)
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(haltedTraverserStrategy.halt(traverser)));
else
haltedTraversers.add(traverser.detach());
} else {
activeTraversers.add(traverser);
}
});
}
}
}