blob: c619b563b0068939ef739fef69bdccf3d81eec69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.common.ir;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Sets;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.PairKeyExtractor;
import org.apache.nemo.common.Util;
import org.apache.nemo.common.coder.BytesDecoderFactory;
import org.apache.nemo.common.coder.BytesEncoderFactory;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.dag.DAGInterface;
import org.apache.nemo.common.exception.CompileTimeOptimizationException;
import org.apache.nemo.common.exception.MetricException;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.*;
import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.LoopVertex;
import org.apache.nemo.common.ir.vertex.SourceVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.ir.vertex.utility.TaskSizeSplitterVertex;
import org.apache.nemo.common.ir.vertex.utility.runtimepass.MessageAggregatorVertex;
import org.apache.nemo.common.ir.vertex.utility.runtimepass.MessageGeneratorVertex;
import org.apache.nemo.common.ir.vertex.utility.RelayVertex;
import org.apache.nemo.common.ir.vertex.utility.SamplingVertex;
import org.apache.nemo.common.ir.vertex.utility.runtimepass.SignalVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* An IRDAG object captures a high-level data processing application (e.g., Spark/Beam application).
* - IRVertex: A data-parallel operation. (e.g., map)
* - IREdge: A data dependency between two operations. (e.g., shuffle)
* <p>
* Largely two types of IRDAG optimization(modification) methods are provided.
* All of these methods preserve application semantics.
* - Annotation: setProperty(), getPropertyValue() on each IRVertex/IREdge
* - Reshaping: insert(), delete() on the IRDAG
* <p>
* TODO #341: Rethink IRDAG insert() signatures
*/
@NotThreadSafe
public final class IRDAG implements DAGInterface<IRVertex, IREdge> {
private static final Logger LOG = LoggerFactory.getLogger(IRDAG.class.getName());
private DAG<IRVertex, IREdge> dagSnapshot; // the DAG that was saved most recently.
private DAG<IRVertex, IREdge> modifiedDAG; // the DAG that is being updated.
// To remember original encoders/decoders, and etc
private final Map<RelayVertex, IREdge> streamVertexToOriginalEdge;
// To remember sampling vertex groups
private final Map<SamplingVertex, Set<SamplingVertex>> samplingVertexToGroup;
// To remember message barrier/aggregator vertex groups
private final Map<IRVertex, Set<IRVertex>> messageVertexToGroup;
/**
* To remember the specifications of the executors used to run the IR DAG with.
*/
private final List<Pair<Integer, ResourceSpecification>> executorInfo;
/**
* @param originalUserApplicationDAG the initial DAG.
*/
public IRDAG(final DAG<IRVertex, IREdge> originalUserApplicationDAG) {
this.modifiedDAG = originalUserApplicationDAG;
this.dagSnapshot = originalUserApplicationDAG;
this.streamVertexToOriginalEdge = new HashMap<>();
this.samplingVertexToGroup = new HashMap<>();
this.messageVertexToGroup = new HashMap<>();
this.executorInfo = new ArrayList<>();
}
public IRDAGChecker.CheckerResult checkIntegrity() {
return IRDAGChecker.get().doCheck(modifiedDAG);
}
//////////////////////////////////////////////////
/**
* Used internally by Nemo to advance the DAG snapshot after applying each pass.
*
* @param checker that compares the dagSnapshot and the modifiedDAG
* to determine if the snapshot can be set the current modifiedDAG.
* @return true if the checker passes, false otherwise.
*/
public boolean advanceDAGSnapshot(final BiFunction<IRDAG, IRDAG, Boolean> checker) {
final boolean canAdvance = checker.apply(new IRDAG(dagSnapshot), new IRDAG(modifiedDAG));
if (canAdvance) {
dagSnapshot = modifiedDAG;
}
return canAdvance;
}
/**
* @return a IR DAG summary string, consisting of only the vertices generated from the frontend.
*/
public String irDAGSummary() {
final Long inputBytes = this.getInputSize();
final String inputSizeString = inputBytes < 1024 ? inputBytes + "B"
: (inputBytes / 1024 < 1024 ? inputBytes / 1024 + "KB"
: (inputBytes / 1048576 < 1024 ? inputBytes / 1048576 + "MB"
: (inputBytes / 1073741824L < 1024 ? inputBytes / 1073741824L + "GB"
: inputBytes / 1099511627776L + "TB")));
return "rv" + getRootVertices().size()
+ "_v" + getVertices().stream()
.filter(v -> !v.isUtilityVertex()) // Exclude utility vertices
.count()
+ "_e" + getVertices().stream()
.filter(v -> !v.isUtilityVertex()) // Exclude utility vertices
.mapToInt(v -> getIncomingEdgesOf(v).size())
.sum() + "_" + inputSizeString;
}
/**
* @return the total sum of the input size for the IR DAG.
*/
public Long getInputSize() {
return this.getRootVertices().stream()
.filter(irVertex -> irVertex instanceof SourceVertex)
.mapToLong(srcVertex -> {
try {
return ((SourceVertex) srcVertex).getEstimatedSizeBytes();
} catch (Exception e) {
throw new MetricException(e);
}
})
.sum();
}
/**
* Setter for the executor specifications information.
* @param parsedExecutorInfo executor information parsed for processing.
*/
public void recordExecutorInfo(final List<Pair<Integer, ResourceSpecification>> parsedExecutorInfo) {
executorInfo.addAll(parsedExecutorInfo);
}
/**
* Getter for the executor specifications information.
* @return the executor specifications information.
*/
public List<Pair<Integer, ResourceSpecification>> getExecutorInfo() {
return executorInfo;
}
////////////////////////////////////////////////// Methods for reshaping the DAG topology.
/**
* Deletes a previously inserted utility vertex.
* (e.g., TriggerVertex, RelayVertex, SamplingVertex)
* <p>
* Notice that the actual number of vertices that will be deleted after this call returns can be more than one.
* We roll back the changes made with the previous insert(), while preserving application semantics.
*
* @param vertexToDelete to delete.
*/
public void delete(final IRVertex vertexToDelete) {
assertExistence(vertexToDelete);
deleteRecursively(vertexToDelete, new HashSet<>());
// Build again, with source/sink checks on
modifiedDAG = rebuildExcluding(modifiedDAG, Collections.emptySet()).build();
}
private Set<IRVertex> getVertexGroupToDelete(final IRVertex vertexToDelete) {
if (vertexToDelete instanceof RelayVertex) {
return Sets.newHashSet(vertexToDelete);
} else if (vertexToDelete instanceof SamplingVertex) {
final Set<SamplingVertex> samplingVertexGroup = samplingVertexToGroup.get(vertexToDelete);
final Set<IRVertex> converted = new HashSet<>(samplingVertexGroup.size());
for (final IRVertex sv : samplingVertexGroup) {
converted.add(sv); // explicit conversion to IRVertex is needed.. otherwise the compiler complains :(
}
return converted;
} else if (vertexToDelete instanceof MessageAggregatorVertex || vertexToDelete instanceof MessageGeneratorVertex) {
return messageVertexToGroup.get(vertexToDelete);
} else if (vertexToDelete instanceof SignalVertex) {
return Sets.newHashSet(vertexToDelete);
} else if (vertexToDelete instanceof TaskSizeSplitterVertex) {
return Sets.newHashSet(vertexToDelete);
} else {
throw new IllegalArgumentException(vertexToDelete.getId());
}
}
/**
* Delete a group of vertex that corresponds to the specified vertex.
* And then recursively delete neighboring utility vertices.
* <p>
* (WARNING) Only call this method inside delete(), or inside this method itself.
* This method uses buildWithoutSourceSinkCheck() for intermediate DAGs,
* which will be finally checked in delete().
*
* @param vertexToDelete to delete
* @param visited vertex groups (because cyclic dependencies between vertex groups are possible)
*/
private void deleteRecursively(final IRVertex vertexToDelete, final Set<IRVertex> visited) {
if (!Util.isUtilityVertex(vertexToDelete)) {
throw new IllegalArgumentException(vertexToDelete.getId());
}
if (visited.contains(vertexToDelete)) {
return;
}
// Three data structures
final Set<IRVertex> vertexGroupToDelete = getVertexGroupToDelete(vertexToDelete);
final Set<IRVertex> utilityParents = vertexGroupToDelete.stream()
.map(modifiedDAG::getIncomingEdgesOf)
.flatMap(inEdgeList -> inEdgeList.stream().map(IREdge::getSrc))
.filter(Util::isUtilityVertex)
.collect(Collectors.toSet());
final Set<IRVertex> utilityChildren = vertexGroupToDelete.stream()
.map(modifiedDAG::getOutgoingEdgesOf)
.flatMap(outEdgeList -> outEdgeList.stream().map(IREdge::getDst))
.filter(Util::isUtilityVertex)
.collect(Collectors.toSet());
// We have 'visited' this group
visited.addAll(vertexGroupToDelete);
// STEP 1: Delete parent utility vertices
// Vertices that are 'in between' the group are also deleted here
Sets.difference(utilityParents, vertexGroupToDelete).forEach(ptd -> deleteRecursively(ptd, visited));
// STEP 2: Delete the specified vertex(vertices)
if (vertexToDelete instanceof RelayVertex) {
final DAGBuilder<IRVertex, IREdge> builder = rebuildExcluding(modifiedDAG, vertexGroupToDelete);
// Add a new edge that directly connects the src of the stream vertex to its dst
modifiedDAG.getOutgoingEdgesOf(vertexToDelete).stream()
.filter(e -> !Util.isControlEdge(e))
.map(IREdge::getDst)
.forEach(dstVertex ->
modifiedDAG.getIncomingEdgesOf(vertexToDelete).stream()
.filter(e -> !Util.isControlEdge(e))
.map(IREdge::getSrc)
.forEach(srcVertex -> builder.connectVertices(
Util.cloneEdge(streamVertexToOriginalEdge.get(vertexToDelete), srcVertex, dstVertex))));
modifiedDAG = builder.buildWithoutSourceSinkCheck();
} else if (vertexToDelete instanceof MessageAggregatorVertex || vertexToDelete instanceof MessageGeneratorVertex) {
modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
final Optional<Integer> deletedMessageIdOptional = vertexGroupToDelete.stream()
.filter(vtd -> vtd instanceof MessageAggregatorVertex)
.map(vtd -> vtd.getPropertyValue(MessageIdVertexProperty.class).<IllegalArgumentException>orElseThrow(
() -> new IllegalArgumentException(
"MessageAggregatorVertex " + vtd.getId() + " does not have MessageIdVertexProperty.")))
.findAny();
deletedMessageIdOptional.ifPresent(deletedMessageId ->
modifiedDAG.getEdges().forEach(e ->
e.getPropertyValue(MessageIdEdgeProperty.class).ifPresent(
hashSet -> hashSet.remove(deletedMessageId))));
} else if (vertexToDelete instanceof SamplingVertex) {
modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
} else if (vertexToDelete instanceof SignalVertex) {
modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
final Optional<Integer> deletedMessageIdOptional = vertexGroupToDelete.stream()
.map(vtd -> vtd.getPropertyValue(MessageIdVertexProperty.class).<IllegalArgumentException>orElseThrow(
() -> new IllegalArgumentException(
"SignalVertex " + vtd.getId() + " does not have MessageIdVertexProperty.")))
.findAny();
deletedMessageIdOptional.ifPresent(deletedMessageId ->
modifiedDAG.getEdges().forEach(e ->
e.getPropertyValue(MessageIdEdgeProperty.class).ifPresent(
hashSet -> hashSet.remove(deletedMessageId))));
} else if (vertexToDelete instanceof TaskSizeSplitterVertex) {
modifiedDAG = rebuildExcludingSplitter(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck();
} else {
throw new IllegalArgumentException(vertexToDelete.getId());
}
// STEP 3: Delete children utility vertices
Sets.difference(utilityChildren, vertexGroupToDelete).forEach(ctd -> deleteRecursively(ctd, visited));
}
private DAGBuilder<IRVertex, IREdge> rebuildExcluding(final DAG<IRVertex, IREdge> dag, final Set<IRVertex> excluded) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
dag.getVertices().stream().filter(v -> !excluded.contains(v)).forEach(builder::addVertex);
dag.getEdges().stream().filter(e -> !excluded.contains(e.getSrc()) && !excluded.contains(e.getDst()))
.forEach(builder::connectVertices);
return builder;
}
/**
* helper method in deleting splitter vertex.
* @param dag dag to get information.
* @param excluded Set of Splitter vertex to delete. Always a singleton set.
* @return the DAG builder after the deletion.
*/
private DAGBuilder<IRVertex, IREdge> rebuildExcludingSplitter(final DAG<IRVertex, IREdge> dag,
final Set<IRVertex> excluded) {
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
dag.getVertices().stream().filter(v -> !excluded.contains(v)).forEach(builder::addVertex);
dag.getEdges().stream()
.filter(e -> !(excluded.contains(e.getSrc()) || excluded.contains(e.getDst())))
.forEach(builder::connectVertices);
for (IRVertex vertex : excluded) {
if (!(vertex instanceof TaskSizeSplitterVertex)) {
break;
}
final TaskSizeSplitterVertex splitter = (TaskSizeSplitterVertex) vertex;
//first, restore original vertices
DAG<IRVertex, IREdge> internalDag = splitter.getDAG();
internalDag.getVertices().stream().filter(v -> !(v instanceof SignalVertex)).forEach(builder::addVertex);
internalDag.getEdges().stream()
.filter(e -> !(e.getSrc() instanceof SignalVertex || e.getDst() instanceof SignalVertex))
.forEach(builder::connectVertices);
//second, take care of edges connected to splitter vertex
for (IREdge edgeToSplitter : dag.getIncomingEdgesOf(splitter)) {
if (edgeToSplitter.getSrc() instanceof TaskSizeSplitterVertex) {
final TaskSizeSplitterVertex prevSp = (TaskSizeSplitterVertex) edgeToSplitter.getSrc();
final IREdge internalEdge = prevSp.getEdgeWithInternalVertex(edgeToSplitter);
final IREdge newEdgeToPrevSp = Util.cloneEdge(internalEdge, prevSp, internalEdge.getDst());
prevSp.mapEdgeWithLoop(newEdgeToPrevSp, internalEdge);
builder.connectVertices(newEdgeToPrevSp);
} else {
final IREdge internalEdge = splitter.getEdgeWithInternalVertex(edgeToSplitter);
builder.connectVertices(internalEdge);
}
}
for (IREdge edgeFromSplitter : dag.getOutgoingEdgesOf(splitter)) {
if (edgeFromSplitter.getDst() instanceof TaskSizeSplitterVertex) {
final TaskSizeSplitterVertex nextSp = (TaskSizeSplitterVertex) edgeFromSplitter.getDst();
final IREdge internalEdge = nextSp.getEdgeWithInternalVertex(edgeFromSplitter);
final IREdge newEdgeToNextSp = Util.cloneEdge(internalEdge, internalEdge.getSrc(), nextSp);
nextSp.mapEdgeWithLoop(newEdgeToNextSp, internalEdge);
builder.connectVertices(newEdgeToNextSp);
} else {
final IREdge internalEdge = splitter.getEdgeWithInternalVertex(edgeFromSplitter);
builder.connectVertices(internalEdge);
}
}
}
return builder;
}
/**
* Inserts a new vertex that streams data.
* <p>
* Before: src - edgeToStreamize - dst
* After: src - edgeToStreamizeWithNewDestination - relayVertex - oneToOneEdge - dst
* (replaces the "Before" relationships)
* <p>
* This preserves semantics as the relayVertex simply forwards data elements from the input edge to the output edge.
*
* @param relayVertex to insert.
* @param edgeToStreamize to modify.
*/
public void insert(final RelayVertex relayVertex, final IREdge edgeToStreamize) {
assertNonExistence(relayVertex);
assertNonControlEdge(edgeToStreamize);
// Create a completely new DAG with the vertex inserted.
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
// Integrity check
if (edgeToStreamize.getPropertyValue(MessageIdEdgeProperty.class).isPresent()
&& !edgeToStreamize.getPropertyValue(MessageIdEdgeProperty.class).get().isEmpty()) {
throw new CompileTimeOptimizationException(edgeToStreamize.getId() + " has a MessageId, and cannot be removed");
}
// RelayVertex should not be inserted before SplitterVertex.
if (edgeToStreamize.getDst() instanceof TaskSizeSplitterVertex) {
return;
}
// Insert the vertex.
final IRVertex vertexToInsert = wrapSamplingVertexIfNeeded(relayVertex, edgeToStreamize.getSrc());
builder.addVertex(vertexToInsert);
edgeToStreamize.getSrc().getPropertyValue(ParallelismProperty.class)
.ifPresent(p -> vertexToInsert.setProperty(ParallelismProperty.of(p)));
// Build the new DAG to reflect the new topology.
modifiedDAG.topologicalDo(v -> {
builder.addVertex(v); // None of the existing vertices are deleted.
for (final IREdge edge : modifiedDAG.getIncomingEdgesOf(v)) {
if (edge.equals(edgeToStreamize)) {
// MATCH!
// Edge to the relayVertex
final IREdge toSV = new IREdge(
edgeToStreamize.getPropertyValue(CommunicationPatternProperty.class).get(),
edgeToStreamize.getSrc(),
vertexToInsert);
edgeToStreamize.copyExecutionPropertiesTo(toSV);
// Edge from the relayVertex.
final IREdge fromSV = new IREdge(CommunicationPatternProperty.Value.ONE_TO_ONE, vertexToInsert, v);
fromSV.setProperty(EncoderProperty.of(edgeToStreamize.getPropertyValue(EncoderProperty.class).get()));
fromSV.setProperty(DecoderProperty.of(edgeToStreamize.getPropertyValue(DecoderProperty.class).get()));
// Annotations for efficient data transfers - toSV
toSV.setPropertyPermanently(DecoderProperty.of(BytesDecoderFactory.of()));
toSV.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.LZ4));
toSV.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.NONE));
// Annotations for efficient data transfers - fromSV
fromSV.setPropertyPermanently(EncoderProperty.of(BytesEncoderFactory.of()));
fromSV.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.NONE));
fromSV.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.LZ4));
fromSV.setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Type.DEDICATED_KEY_PER_ELEMENT));
// Track the new edges.
builder.connectVertices(toSV);
builder.connectVertices(fromSV);
} else {
// NO MATCH, so simply connect vertices as before.
builder.connectVertices(edge);
}
}
});
if (edgeToStreamize.getSrc() instanceof RelayVertex) {
streamVertexToOriginalEdge.put(relayVertex, streamVertexToOriginalEdge.get(edgeToStreamize.getSrc()));
} else if (edgeToStreamize.getDst() instanceof RelayVertex) {
streamVertexToOriginalEdge.put(relayVertex, streamVertexToOriginalEdge.get(edgeToStreamize.getDst()));
} else {
streamVertexToOriginalEdge.put(relayVertex, edgeToStreamize);
}
modifiedDAG = builder.build(); // update the DAG.
}
/**
* Inserts a new vertex that analyzes intermediate data, and triggers a dynamic optimization.
* <p>
* For each edge in edgesToGetStatisticsOf...
* <p>
* Before: src - edge - dst
* After: src - oneToOneEdge(a clone of edge) - triggerVertex -
* shuffleEdge - messageAggregatorVertex - broadcastEdge - dst
* (the "Before" relationships are unmodified)
* <p>
* This preserves semantics as the results of the inserted message vertices are never consumed by the original IRDAG.
* <p>
* TODO #345: Simplify insert(TriggerVertex)
*
* @param messageGeneratorVertex to insert.
* @param messageAggregatorVertex to insert.
* @param triggerOutputEncoder to use.
* @param triggerOutputDecoder to use.
* @param edgesToGetStatisticsOf to examine.
* @param edgesToOptimize to optimize.
*/
public void insert(final MessageGeneratorVertex messageGeneratorVertex,
final MessageAggregatorVertex messageAggregatorVertex,
final EncoderProperty triggerOutputEncoder,
final DecoderProperty triggerOutputDecoder,
final Set<IREdge> edgesToGetStatisticsOf,
final Set<IREdge> edgesToOptimize) {
//edge case: when the destination of mav is splitter, do not insert!
assertNonExistence(messageGeneratorVertex);
assertNonExistence(messageAggregatorVertex);
edgesToGetStatisticsOf.forEach(this::assertNonControlEdge);
edgesToOptimize.forEach(this::assertNonControlEdge);
if (edgesToGetStatisticsOf.stream().map(edge -> edge.getDst().getId()).collect(Collectors.toSet()).size() != 1) {
throw new IllegalArgumentException("Not destined to the same vertex: " + edgesToOptimize.toString());
}
if (edgesToOptimize.stream().map(edge -> edge.getDst().getId()).collect(Collectors.toSet()).size() != 1) {
throw new IllegalArgumentException("Not destined to the same vertex: " + edgesToOptimize.toString());
}
// Create a completely new DAG with the vertex inserted.
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
// All of the existing vertices and edges remain intact
modifiedDAG.topologicalDo(v -> {
builder.addVertex(v);
modifiedDAG.getIncomingEdgesOf(v).forEach(builder::connectVertices);
});
////////////////////////////////// STEP 1: Insert new vertices and edges (src - trigger - agg - dst)
// From src to trigger
final List<IRVertex> triggerList = new ArrayList<>();
for (final IREdge edge : edgesToGetStatisticsOf) {
final IRVertex triggerToAdd = wrapSamplingVertexIfNeeded(
new MessageGeneratorVertex<>(messageGeneratorVertex.getMessageFunction()), edge.getSrc());
builder.addVertex(triggerToAdd);
triggerList.add(triggerToAdd);
edge.getSrc().getPropertyValue(ParallelismProperty.class)
.ifPresent(p -> triggerToAdd.setProperty(ParallelismProperty.of(p)));
final IREdge edgeToClone;
if (edge.getSrc() instanceof RelayVertex) {
edgeToClone = streamVertexToOriginalEdge.get(edge.getSrc());
} else if (edge.getDst() instanceof RelayVertex) {
edgeToClone = streamVertexToOriginalEdge.get(edge.getDst());
} else {
edgeToClone = edge;
}
final IREdge clone = Util.cloneEdge(
CommunicationPatternProperty.Value.ONE_TO_ONE, edgeToClone, edge.getSrc(), triggerToAdd);
if (edge.getSrc() instanceof TaskSizeSplitterVertex) {
builder.connectSplitterVertexWithoutReplacing(edgeToClone, clone);
} else {
builder.connectVertices(clone);
}
}
// Add agg (no need to wrap inside sampling vertices)
builder.addVertex(messageAggregatorVertex);
// From trigger to agg
for (final IRVertex trigger : triggerList) {
final IREdge edgeToMav = edgeToMessageAggregator(
trigger, messageAggregatorVertex, triggerOutputEncoder, triggerOutputDecoder);
builder.connectVertices(edgeToMav);
}
// From agg to dst
// Add a control dependency (no output) from the messageAggregatorVertex to the destination.
IREdge aggToDst = Util.createControlEdge(
messageAggregatorVertex, edgesToGetStatisticsOf.iterator().next().getDst());
if (edgesToGetStatisticsOf.iterator().next().getDst() instanceof TaskSizeSplitterVertex) {
builder.connectSplitterVertexWithoutReplacing(edgesToGetStatisticsOf.iterator().next(), aggToDst);
} else {
builder.connectVertices(aggToDst);
}
////////////////////////////////// STEP 2: Annotate the MessageId on optimization target edges
modifiedDAG.topologicalDo(v ->
modifiedDAG.getIncomingEdgesOf(v).forEach(inEdge -> {
if (edgesToOptimize.contains(inEdge)) {
final HashSet<Integer> msgEdgeIds =
inEdge.getPropertyValue(MessageIdEdgeProperty.class).orElse(new HashSet<>(0));
msgEdgeIds.add(messageAggregatorVertex.getPropertyValue(MessageIdVertexProperty.class).get());
inEdge.setProperty(MessageIdEdgeProperty.of(msgEdgeIds));
}
})
);
final Set<IRVertex> insertedVertices = new HashSet<>();
insertedVertices.addAll(triggerList);
insertedVertices.add(messageAggregatorVertex);
triggerList.forEach(trigger -> messageVertexToGroup.put(trigger, insertedVertices));
messageVertexToGroup.put(messageAggregatorVertex, insertedVertices);
modifiedDAG = builder.build(); // update the DAG.
}
/**
* Inserts new vertex which calls for runtime pass.
*
* e.g) suppose that we want to change vertex 2's property by using runtime pass, but the related data is not gained
* directly from the incoming edge of vertex 2 (for example, the data is gained from using simulation).
* In this case, it is unnecessary to insert message generator vertex and message aggregator vertex to launch runtime
* pass.
*
* Original case: (vertex1) -- shuffle edge -- (vertex 2)
*
* After inserting signal Vertex:
* (vertex 1) -------------------- shuffle edge ------------------- (vertex 2)
* -- control edge -- (signal vertex) -- control edge --
*
* Therefore, the shuffle edge to vertex 2 is executed after signal vertex is executed.
* Since signal vertex only 'signals' the launch of runtime pass, its parallelism is sufficient to be only 1.
* @param toInsert Signal vertex to optimize.
* @param edgeToOptimize Original edge to optimize(in the above example, shuffle edge).
*/
public void insert(final SignalVertex toInsert,
final IREdge edgeToOptimize) {
// Create a completely new DAG with the vertex inserted.
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
// All of the existing vertices and edges remain intact
modifiedDAG.topologicalDo(v -> {
builder.addVertex(v);
modifiedDAG.getIncomingEdgesOf(v).forEach(builder::connectVertices);
});
// insert Signal Vertex in DAG.
builder.addVertex(toInsert);
final IREdge controlEdgeToSV = Util.createControlEdge(edgeToOptimize.getSrc(), toInsert);
final IREdge controlEdgeFromSV = Util.createControlEdge(toInsert, edgeToOptimize.getDst());
builder.connectVertices(controlEdgeToSV);
builder.connectVertices(controlEdgeFromSV);
modifiedDAG.topologicalDo(v ->
modifiedDAG.getIncomingEdgesOf(v).forEach(inEdge -> {
if (edgeToOptimize.equals(inEdge)) {
final HashSet<Integer> msgEdgeIds =
inEdge.getPropertyValue(MessageIdEdgeProperty.class).orElse(new HashSet<>(0));
msgEdgeIds.add(toInsert.getPropertyValue(MessageIdVertexProperty.class).get());
inEdge.setProperty(MessageIdEdgeProperty.of(msgEdgeIds));
}
})
);
// update the DAG.
modifiedDAG = builder.build();
}
/**
* Inserts a set of samplingVertices that process sampled data.
* <p>
* This method automatically inserts the following three types of edges.
* (1) Edges between samplingVertices to reflect the original relationship
* (2) Edges from the original IRDAG to samplingVertices that clone the inEdges of the original vertices
* (3) Edges from the samplingVertices to the original IRDAG to respect executeAfterSamplingVertices
* <p>
* Suppose the caller supplies the following arguments to perform a "sampled run" of vertices {V1, V2},
* prior to executing them.
* - samplingVertices: {V1', V2'}
* - childrenOfSamplingVertices: {V1}
* <p>
* Before: V1 - oneToOneEdge - V2 - shuffleEdge - V3
* After: V1' - oneToOneEdge - V2' - controlEdge - V1 - oneToOneEdge - V2 - shuffleEdge - V3
* <p>
* This preserves semantics as the original IRDAG remains unchanged and unaffected.
* <p>
* (Future calls to insert() can add new vertices that connect to sampling vertices. Such new vertices will also be
* wrapped with sampling vertices, as new vertices that consume outputs from sampling vertices will process
* a subset of data anyways, and no such new vertex will reach the original DAG except via control edges)
* <p>
* TODO #343: Extend SamplingVertex control edges
*
* @param toInsert sampling vertices.
* @param executeAfter that must be executed after toInsert.
*/
public void insert(final Set<SamplingVertex> toInsert,
final Set<IRVertex> executeAfter) {
toInsert.forEach(this::assertNonExistence);
executeAfter.forEach(this::assertExistence);
// Create a completely new DAG with the vertex inserted.
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
// All of the existing vertices and edges remain intact
modifiedDAG.topologicalDo(v -> {
builder.addVertex(v);
modifiedDAG.getIncomingEdgesOf(v).forEach(builder::connectVertices);
});
// Add the sampling vertices
toInsert.forEach(builder::addVertex);
// Get the original vertices
final Map<IRVertex, IRVertex> originalToSampling = toInsert.stream()
.collect(Collectors.toMap(sv -> modifiedDAG.getVertexById(sv.getOriginalVertexId()), Function.identity()));
final Set<IREdge> inEdgesOfOriginals = originalToSampling.keySet()
.stream()
.flatMap(ov -> modifiedDAG.getIncomingEdgesOf(ov).stream())
.collect(Collectors.toSet());
// [EDGE TYPE 1] Between sampling vertices
final Set<IREdge> betweenOriginals = inEdgesOfOriginals
.stream()
.filter(ovInEdge -> originalToSampling.containsKey(ovInEdge.getSrc()))
.collect(Collectors.toSet());
betweenOriginals.stream().map(boEdge -> Util.cloneEdge(
boEdge,
originalToSampling.get(boEdge.getSrc()),
originalToSampling.get(boEdge.getDst()))).forEach(builder::connectVertices);
// [EDGE TYPE 2] From original IRDAG to sampling vertices
final Set<IREdge> notBetweenOriginals = inEdgesOfOriginals
.stream()
.filter(ovInEdge -> !originalToSampling.containsKey(ovInEdge.getSrc()))
.collect(Collectors.toSet());
notBetweenOriginals.stream().map(nboEdge -> {
final IREdge cloneEdge = Util.cloneEdge(
nboEdge,
nboEdge.getSrc(), // sampling vertices consume a subset of original data partitions here
originalToSampling.get(nboEdge.getDst()));
nboEdge.copyExecutionPropertiesTo(cloneEdge); // exec properties must be exactly the same
return cloneEdge;
}).forEach(builder::connectVertices);
// [EDGE TYPE 3] From sampling vertices to vertices that should be executed after
final Set<IRVertex> sinks = getSinksWithinVertexSet(modifiedDAG, originalToSampling.keySet())
.stream()
.map(originalToSampling::get)
.collect(Collectors.toSet());
for (final IRVertex ea : executeAfter) {
for (final IRVertex sink : sinks) {
// Control edge that enforces execution ordering
builder.connectVertices(Util.createControlEdge(sink, ea));
}
}
toInsert.forEach(tiv -> samplingVertexToGroup.put(tiv, toInsert));
modifiedDAG = builder.build(); // update the DAG.
}
/**
* Insert TaskSizeSplitterVertex in dag.
* @param toInsert TaskSizeSplitterVertex to insert.
*/
public void insert(final TaskSizeSplitterVertex toInsert) {
final Set<IRVertex> originalVertices = toInsert.getOriginalVertices();
final Set<IREdge> incomingEdgesOfOriginalVertices = originalVertices
.stream()
.flatMap(ov -> modifiedDAG.getIncomingEdgesOf(ov).stream())
.collect(Collectors.toSet());
final Set<IREdge> outgoingEdgesOfOriginalVertices = originalVertices
.stream()
.flatMap(ov -> modifiedDAG.getOutgoingEdgesOf(ov).stream())
.collect(Collectors.toSet());
final Set<IREdge> fromOutsideToOriginal = toInsert.getEdgesFromOutsideToOriginal(modifiedDAG);
final Set<IREdge> fromOriginalToOutside = toInsert.getEdgesFromOriginalToOutside(modifiedDAG);
// make edges connected to splitter vertex
final Set<IREdge> fromOutsideToSplitter = toInsert.getEdgesFromOutsideToSplitter(modifiedDAG);
final Set<IREdge> fromSplitterToOutside = toInsert.getEdgesFromSplitterToOutside(modifiedDAG);
//map splitter vertex connection to corresponding internal vertex connection
for (IREdge splitterEdge : fromSplitterToOutside) {
for (IREdge internalEdge : fromOriginalToOutside) {
if (splitterEdge.getDst() instanceof TaskSizeSplitterVertex) {
TaskSizeSplitterVertex nextSplitter = (TaskSizeSplitterVertex) splitterEdge.getDst();
if (nextSplitter.getOriginalVertices().contains(internalEdge.getDst())) {
toInsert.mapEdgeWithLoop(splitterEdge, internalEdge);
}
} else {
if (splitterEdge.getDst().equals(internalEdge.getDst())) {
toInsert.mapEdgeWithLoop(splitterEdge, internalEdge);
}
}
}
}
for (IREdge splitterEdge : fromOutsideToSplitter) {
for (IREdge internalEdge : fromOutsideToOriginal) {
if (splitterEdge.getSrc().equals(internalEdge.getSrc())) {
toInsert.mapEdgeWithLoop(splitterEdge, internalEdge);
}
}
}
fromOutsideToOriginal.forEach(toInsert::addDagIncomingEdge);
fromOutsideToOriginal.forEach(toInsert::addNonIterativeIncomingEdge);
fromOriginalToOutside.forEach(toInsert::addDagOutgoingEdge);
// All preparation done. Insert splitter vertex.
final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
//insert vertex and edges irrelevant to splitter vertex
modifiedDAG.topologicalDo(v -> {
if (!originalVertices.contains(v)) {
builder.addVertex(v);
for (IREdge edge : modifiedDAG.getIncomingEdgesOf(v)) {
if (!incomingEdgesOfOriginalVertices.contains(edge) && !outgoingEdgesOfOriginalVertices.contains(edge)) {
builder.connectVertices(edge);
}
}
}
});
//insert splitter vertices
builder.addVertex(toInsert);
//connect splitter to outside world
fromOutsideToSplitter.forEach(builder::connectVertices);
fromSplitterToOutside.forEach(builder::connectVertices);
modifiedDAG = builder.build();
}
/**
* Reshape unsafely, without guarantees on preserving application semantics.
* TODO #330: Refactor Unsafe Reshaping Passes
*
* @param unsafeReshapingFunction takes as input the underlying DAG, and outputs a reshaped DAG.
*/
public void reshapeUnsafely(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge>> unsafeReshapingFunction) {
modifiedDAG = unsafeReshapingFunction.apply(modifiedDAG);
}
////////////////////////////////////////////////// Private helper methods.
private Set<IRVertex> getSinksWithinVertexSet(final DAG<IRVertex, IREdge> dag,
final Set<IRVertex> vertexSet) {
final Set<IRVertex> parentsOfAnotherVertex = vertexSet.stream()
.flatMap(v -> dag.getOutgoingEdgesOf(v).stream())
.filter(e -> vertexSet.contains(e.getDst()))
.map(IREdge::getSrc) // makes the result a subset of the input vertexSet
.collect(Collectors.toSet());
return Sets.difference(vertexSet, parentsOfAnotherVertex);
}
private IRVertex wrapSamplingVertexIfNeeded(final IRVertex newVertex, final IRVertex existingVertexToConnectWith) {
// If the connecting vertex is a sampling vertex, the new vertex must be wrapped inside a sampling vertex too.
return existingVertexToConnectWith instanceof SamplingVertex
? new SamplingVertex(newVertex, ((SamplingVertex) existingVertexToConnectWith).getDesiredSampleRate())
: newVertex;
}
private void assertNonControlEdge(final IREdge e) {
if (Util.isControlEdge(e)) {
throw new IllegalArgumentException(e.getId());
}
}
private void assertExistence(final IRVertex v) {
if (!getVertices().contains(v)) {
throw new IllegalArgumentException(v.getId());
}
}
private void assertNonExistence(final IRVertex v) {
if (getVertices().contains(v)) {
throw new IllegalArgumentException(v.getId());
}
}
/**
* @param trigger src.
* @param agg dst.
* @param encoder src-dst encoder.
* @param decoder src-dst decoder.
* @return the edge.
*/
private IREdge edgeToMessageAggregator(final IRVertex trigger,
final IRVertex agg,
final EncoderProperty encoder,
final DecoderProperty decoder) {
final IREdge newEdge = new IREdge(CommunicationPatternProperty.Value.SHUFFLE, trigger, agg);
newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LOCAL_FILE_STORE));
newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.KEEP));
newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.PUSH));
newEdge.setPropertyPermanently(encoder);
newEdge.setPropertyPermanently(decoder);
newEdge.setPropertyPermanently(KeyExtractorProperty.of(new PairKeyExtractor()));
// TODO #345: Simplify insert(TriggerVertex)
// these are obviously wrong, but hacks for now...
newEdge.setPropertyPermanently(KeyEncoderProperty.of(encoder.getValue()));
newEdge.setPropertyPermanently(KeyDecoderProperty.of(decoder.getValue()));
return newEdge;
}
////////////////////////////////////////////////// DAGInterface methods - forward calls to the underlying DAG.
@Override
public void topologicalDo(final Consumer<IRVertex> function) {
modifiedDAG.topologicalDo(function);
}
@Override
public void dfsTraverse(final Consumer<IRVertex> function, final TraversalOrder traversalOrder) {
modifiedDAG.dfsTraverse(function, traversalOrder);
}
@Override
public void dfsDo(final IRVertex vertex,
final Consumer<IRVertex> vertexConsumer,
final TraversalOrder traversalOrder,
final Set<IRVertex> visited) {
modifiedDAG.dfsDo(vertex, vertexConsumer, traversalOrder, visited);
}
@Override
public Boolean pathExistsBetween(final IRVertex v1, final IRVertex v2) {
return modifiedDAG.pathExistsBetween(v1, v2);
}
@Override
public Boolean isCompositeVertex(final IRVertex irVertex) {
return modifiedDAG.isCompositeVertex(irVertex);
}
@Override
public Integer getLoopStackDepthOf(final IRVertex irVertex) {
return modifiedDAG.getLoopStackDepthOf(irVertex);
}
@Override
public LoopVertex getAssignedLoopVertexOf(final IRVertex irVertex) {
return modifiedDAG.getAssignedLoopVertexOf(irVertex);
}
@Override
public ObjectNode asJsonNode() {
return modifiedDAG.asJsonNode();
}
@Override
public void storeJSON(final String directory, final String name, final String description) {
modifiedDAG.storeJSON(directory, name, description);
}
@Override
public IRVertex getVertexById(final String id) {
return modifiedDAG.getVertexById(id);
}
@Override
public IREdge getEdgeById(final String id) {
return modifiedDAG.getEdgeById(id);
}
@Override
public List<IRVertex> getVertices() {
return modifiedDAG.getVertices();
}
@Override
public List<IREdge> getEdges() {
return modifiedDAG.getEdges();
}
@Override
public List<IRVertex> getRootVertices() {
return modifiedDAG.getRootVertices();
}
@Override
public List<IREdge> getIncomingEdgesOf(final IRVertex v) {
return modifiedDAG.getIncomingEdgesOf(v);
}
@Override
public List<IREdge> getIncomingEdgesOf(final String vertexId) {
return modifiedDAG.getIncomingEdgesOf(vertexId);
}
@Override
public List<IREdge> getOutgoingEdgesOf(final IRVertex v) {
return modifiedDAG.getOutgoingEdgesOf(v);
}
@Override
public List<IREdge> getOutgoingEdgesOf(final String vertexId) {
return modifiedDAG.getOutgoingEdgesOf(vertexId);
}
@Override
public List<IRVertex> getParents(final String vertexId) {
return modifiedDAG.getParents(vertexId);
}
@Override
public List<IRVertex> getChildren(final String vertexId) {
return modifiedDAG.getChildren(vertexId);
}
@Override
public IREdge getEdgeBetween(final String srcVertexId,
final String dstVertexId) {
return modifiedDAG.getEdgeBetween(srcVertexId, dstVertexId);
}
@Override
public List<IRVertex> getTopologicalSort() {
return modifiedDAG.getTopologicalSort();
}
@Override
public List<IRVertex> getAncestors(final String vertexId) {
return modifiedDAG.getAncestors(vertexId);
}
@Override
public List<IRVertex> getDescendants(final String vertexId) {
return modifiedDAG.getDescendants(vertexId);
}
@Override
public List<IRVertex> filterVertices(final Predicate<IRVertex> condition) {
return modifiedDAG.filterVertices(condition);
}
@Override
public String toString() {
return asJsonNode().toString();
}
}