blob: e0dccab4129700fde5efa264880868547f71938d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.spark.process.computer;
import com.google.common.base.Optional;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
import org.apache.tinkerpop.gremlin.structure.util.Attachable;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;
import java.util.Collections;
import java.util.List;
import java.util.Set;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class SparkExecutor {
private static final String[] EMPTY_ARRAY = new String[0];
private SparkExecutor() {
}
////////////////////
// VERTEX PROGRAM //
////////////////////
public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(
final JavaPairRDD<Object, VertexWritable> graphRDD,
final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
final SparkMemory memory,
final Configuration apacheConfiguration) {
if (null != viewIncomingRDD) // the graphRDD and the viewRDD must have the same partitioner
assert graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get());
final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
// for each partition of vertices emit a view and their outgoing messages
.mapPartitionsToPair(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? EMPTY_ARRAY : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
final SparkMessenger<M> messenger = new SparkMessenger<>();
workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
// drop any computed properties that are cached in memory
if (elementComputeKeysArray.length > 0)
vertex.dropVertexProperties(elementComputeKeysArray);
final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
previousView.forEach(property -> property.attach(Attachable.Method.create(vertex))); // attach the view to the vertex
// previousView.clear(); // no longer needed so kill it from memory
///
messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
// incomingMessages.clear(); // no longer needed so kill it from memory
///
final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ? // not all vertex programs have compute keys
Collections.emptyList() :
IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
if (!partitionIterator.hasNext())
workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
});
}, true)); // true means that the partition is preserved
// the graphRDD and the viewRDD must have the same partitioner
assert graphRDD.partitioner().get().equals(viewOutgoingRDD.partitioner().get());
// "message pass" by reducing on the vertex object id of the view and message payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
.flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())), // emit the view payload
IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))))) // emit the outgoing message payloads one by one
.reduceByKey(graphRDD.partitioner().get(), (a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
if (a instanceof ViewIncomingPayload) {
((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
return a;
} else if (b instanceof ViewIncomingPayload) {
((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
return b;
} else {
final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
c.mergePayload(a, messageCombiner);
c.mergePayload(b, messageCombiner);
return c;
}
})
.filter(payload -> !(payload._2() instanceof MessagePayload)) // this happens if there is a message to a vertex that does not exist
.filter(payload -> !((payload._2() instanceof ViewIncomingPayload) && !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many messages to a vertex that does not exist
.mapValues(payload -> payload instanceof ViewIncomingPayload ?
(ViewIncomingPayload<M>) payload : // this happens if there is a vertex with incoming messages
new ViewIncomingPayload<>((ViewPayload) payload)); // this happens if there is a vertex with no incoming messages
// the graphRDD and the viewRDD must have the same partitioner
assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
newViewIncomingRDD
.foreachPartition(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
}); // need to complete a task so its BSP and the memory for this iteration is updated
return newViewIncomingRDD;
}
public static <M> JavaPairRDD<Object, VertexWritable> prepareFinalGraphRDD(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
// the graphRDD and the viewRDD must have the same partitioner
assert (graphRDD.partitioner().get().equals(viewIncomingRDD.partitioner().get()));
// attach the final computed view to the cached graph
return graphRDD.leftOuterJoin(viewIncomingRDD)
.mapValues(tuple -> {
final StarGraph.StarVertex vertex = tuple._1().get();
vertex.dropVertexProperties(elementComputeKeys);
final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
view.forEach(property -> property.attach(Attachable.Method.create(vertex)));
// view.clear(); // no longer needed so kill it from memory
return tuple._1();
});
}
/////////////////
// MAP REDUCE //
////////////////
public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
});
if (mapReduce.getMapKeySort().isPresent())
mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 1);
return mapRDD;
}
public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD, final Configuration apacheConfiguration) {
return mapRDD.mapPartitionsToPair(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
});
}
public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
HadoopPools.initialize(apacheConfiguration);
return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
});
if (mapReduce.getReduceKeySort().isPresent())
reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);
return reduceRDD;
}
}