| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.nemo.common.ir; |
| |
| import com.fasterxml.jackson.databind.node.ObjectNode; |
| import com.google.common.collect.Sets; |
| import org.apache.nemo.common.PairKeyExtractor; |
| import org.apache.nemo.common.Util; |
| import org.apache.nemo.common.coder.BytesDecoderFactory; |
| import org.apache.nemo.common.coder.BytesEncoderFactory; |
| import org.apache.nemo.common.dag.DAG; |
| import org.apache.nemo.common.dag.DAGBuilder; |
| import org.apache.nemo.common.dag.DAGInterface; |
| import org.apache.nemo.common.exception.CompileTimeOptimizationException; |
| import org.apache.nemo.common.exception.IllegalEdgeOperationException; |
| import org.apache.nemo.common.ir.edge.IREdge; |
| import org.apache.nemo.common.ir.edge.executionproperty.*; |
| import org.apache.nemo.common.ir.vertex.IRVertex; |
| import org.apache.nemo.common.ir.vertex.LoopVertex; |
| import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty; |
| import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty; |
| import org.apache.nemo.common.ir.vertex.utility.MessageAggregatorVertex; |
| import org.apache.nemo.common.ir.vertex.utility.TriggerVertex; |
| import org.apache.nemo.common.ir.vertex.utility.RelayVertex; |
| import org.apache.nemo.common.ir.vertex.utility.SamplingVertex; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.annotation.concurrent.NotThreadSafe; |
| import java.util.*; |
| import java.util.function.BiFunction; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.function.Predicate; |
| import java.util.stream.Collectors; |
| |
| /** |
| * An IRDAG object captures a high-level data processing application (e.g., Spark/Beam application). |
| * - IRVertex: A data-parallel operation. (e.g., map) |
| * - IREdge: A data dependency between two operations. (e.g., shuffle) |
| * <p> |
| * Largely two types of IRDAG optimization(modification) methods are provided. |
| * All of these methods preserve application semantics. |
| * - Annotation: setProperty(), getPropertyValue() on each IRVertex/IREdge |
| * - Reshaping: insert(), delete() on the IRDAG |
| * <p> |
| * TODO #341: Rethink IRDAG insert() signatures |
| */ |
| @NotThreadSafe |
| public final class IRDAG implements DAGInterface<IRVertex, IREdge> { |
| private static final Logger LOG = LoggerFactory.getLogger(IRDAG.class.getName()); |
| |
| private DAG<IRVertex, IREdge> dagSnapshot; // the DAG that was saved most recently. |
| private DAG<IRVertex, IREdge> modifiedDAG; // the DAG that is being updated. |
| |
| // To remember original encoders/decoders, and etc |
| private final Map<RelayVertex, IREdge> streamVertexToOriginalEdge; |
| |
| // To remember sampling vertex groups |
| private final Map<SamplingVertex, Set<SamplingVertex>> samplingVertexToGroup; |
| |
| // To remember message barrier/aggregator vertex groups |
| private final Map<IRVertex, Set<IRVertex>> messageVertexToGroup; |
| |
| /** |
| * @param originalUserApplicationDAG the initial DAG. |
| */ |
| public IRDAG(final DAG<IRVertex, IREdge> originalUserApplicationDAG) { |
| this.modifiedDAG = originalUserApplicationDAG; |
| this.dagSnapshot = originalUserApplicationDAG; |
| this.streamVertexToOriginalEdge = new HashMap<>(); |
| this.samplingVertexToGroup = new HashMap<>(); |
| this.messageVertexToGroup = new HashMap<>(); |
| } |
| |
| public IRDAGChecker.CheckerResult checkIntegrity() { |
| return IRDAGChecker.get().doCheck(modifiedDAG); |
| } |
| |
| ////////////////////////////////////////////////// |
| |
| /** |
| * Used internally by Nemo to advance the DAG snapshot after applying each pass. |
| * |
| * @param checker that compares the dagSnapshot and the modifiedDAG |
| * to determine if the snapshot can be set the current modifiedDAG. |
| * @return true if the checker passes, false otherwise. |
| */ |
| public boolean advanceDAGSnapshot(final BiFunction<IRDAG, IRDAG, Boolean> checker) { |
| final boolean canAdvance = checker.apply(new IRDAG(dagSnapshot), new IRDAG(modifiedDAG)); |
| if (canAdvance) { |
| dagSnapshot = modifiedDAG; |
| } |
| return canAdvance; |
| } |
| |
| /** |
| * @return a IR DAG summary string, consisting of only the vertices generated from the frontend. |
| */ |
| public String irDAGSummary() { |
| return "rv" + getRootVertices().size() |
| + "_v" + getVertices().stream() |
| .filter(v -> !v.isUtilityVertex()) // Exclude utility vertices |
| .count() |
| + "_e" + getVertices().stream() |
| .filter(v -> !v.isUtilityVertex()) // Exclude utility vertices |
| .mapToInt(v -> getIncomingEdgesOf(v).size()) |
| .sum(); |
| } |
| |
| ////////////////////////////////////////////////// Methods for reshaping the DAG topology. |
| |
| /** |
| * Deletes a previously inserted utility vertex. |
| * (e.g., TriggerVertex, RelayVertex, SamplingVertex) |
| * <p> |
| * Notice that the actual number of vertices that will be deleted after this call returns can be more than one. |
| * We roll back the changes made with the previous insert(), while preserving application semantics. |
| * |
| * @param vertexToDelete to delete. |
| */ |
| public void delete(final IRVertex vertexToDelete) { |
| assertExistence(vertexToDelete); |
| deleteRecursively(vertexToDelete, new HashSet<>()); |
| |
| // Build again, with source/sink checks on |
| modifiedDAG = rebuildExcluding(modifiedDAG, Collections.emptySet()).build(); |
| } |
| |
| private Set<IRVertex> getVertexGroupToDelete(final IRVertex vertexToDelete) { |
| if (vertexToDelete instanceof RelayVertex) { |
| return Sets.newHashSet(vertexToDelete); |
| } else if (vertexToDelete instanceof SamplingVertex) { |
| final Set<SamplingVertex> samplingVertexGroup = samplingVertexToGroup.get(vertexToDelete); |
| final Set<IRVertex> converted = new HashSet<>(samplingVertexGroup.size()); |
| for (final IRVertex sv : samplingVertexGroup) { |
| converted.add(sv); // explicit conversion to IRVertex is needed.. otherwise the compiler complains :( |
| } |
| return converted; |
| } else if (vertexToDelete instanceof MessageAggregatorVertex || vertexToDelete instanceof TriggerVertex) { |
| return messageVertexToGroup.get(vertexToDelete); |
| } else { |
| throw new IllegalArgumentException(vertexToDelete.getId()); |
| } |
| } |
| |
| /** |
| * Delete a group of vertex that corresponds to the specified vertex. |
| * And then recursively delete neighboring utility vertices. |
| * <p> |
| * (WARNING) Only call this method inside delete(), or inside this method itself. |
| * This method uses buildWithoutSourceSinkCheck() for intermediate DAGs, |
| * which will be finally checked in delete(). |
| * |
| * @param vertexToDelete to delete |
| * @param visited vertex groups (because cyclic dependencies between vertex groups are possible) |
| */ |
| private void deleteRecursively(final IRVertex vertexToDelete, final Set<IRVertex> visited) { |
| if (!Util.isUtilityVertex(vertexToDelete)) { |
| throw new IllegalArgumentException(vertexToDelete.getId()); |
| } |
| if (visited.contains(vertexToDelete)) { |
| return; |
| } |
| |
| // Three data structures |
| final Set<IRVertex> vertexGroupToDelete = getVertexGroupToDelete(vertexToDelete); |
| final Set<IRVertex> utilityParents = vertexGroupToDelete.stream() |
| .map(modifiedDAG::getIncomingEdgesOf) |
| .flatMap(inEdgeList -> inEdgeList.stream().map(IREdge::getSrc)) |
| .filter(Util::isUtilityVertex) |
| .collect(Collectors.toSet()); |
| final Set<IRVertex> utilityChildren = vertexGroupToDelete.stream() |
| .map(modifiedDAG::getOutgoingEdgesOf) |
| .flatMap(outEdgeList -> outEdgeList.stream().map(IREdge::getDst)) |
| .filter(Util::isUtilityVertex) |
| .collect(Collectors.toSet()); |
| |
| // We have 'visited' this group |
| visited.addAll(vertexGroupToDelete); |
| |
| // STEP 1: Delete parent utility vertices |
| // Vertices that are 'in between' the group are also deleted here |
| Sets.difference(utilityParents, vertexGroupToDelete).forEach(ptd -> deleteRecursively(ptd, visited)); |
| |
| // STEP 2: Delete the specified vertex(vertices) |
| if (vertexToDelete instanceof RelayVertex) { |
| final DAGBuilder<IRVertex, IREdge> builder = rebuildExcluding(modifiedDAG, vertexGroupToDelete); |
| |
| // Add a new edge that directly connects the src of the stream vertex to its dst |
| modifiedDAG.getOutgoingEdgesOf(vertexToDelete).stream() |
| .filter(e -> !Util.isControlEdge(e)) |
| .map(IREdge::getDst) |
| .forEach(dstVertex -> |
| modifiedDAG.getIncomingEdgesOf(vertexToDelete).stream() |
| .filter(e -> !Util.isControlEdge(e)) |
| .map(IREdge::getSrc) |
| .forEach(srcVertex -> builder.connectVertices( |
| Util.cloneEdge(streamVertexToOriginalEdge.get(vertexToDelete), srcVertex, dstVertex)))); |
| modifiedDAG = builder.buildWithoutSourceSinkCheck(); |
| } else if (vertexToDelete instanceof MessageAggregatorVertex || vertexToDelete instanceof TriggerVertex) { |
| modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck(); |
| final int deletedMessageId = vertexGroupToDelete.stream() |
| .filter(vtd -> vtd instanceof MessageAggregatorVertex) |
| .map(vtd -> ((MessageAggregatorVertex) vtd).getPropertyValue(MessageIdVertexProperty.class).get()) |
| .findAny().get(); |
| modifiedDAG.getEdges().stream() |
| .filter(e -> e.getPropertyValue(MessageIdEdgeProperty.class).isPresent()) |
| .forEach(e -> e.getPropertyValue(MessageIdEdgeProperty.class).get().remove(deletedMessageId)); |
| } else if (vertexToDelete instanceof SamplingVertex) { |
| modifiedDAG = rebuildExcluding(modifiedDAG, vertexGroupToDelete).buildWithoutSourceSinkCheck(); |
| } else { |
| throw new IllegalArgumentException(vertexToDelete.getId()); |
| } |
| |
| // STEP 3: Delete children utility vertices |
| Sets.difference(utilityChildren, vertexGroupToDelete).forEach(ctd -> deleteRecursively(ctd, visited)); |
| } |
| |
| private DAGBuilder<IRVertex, IREdge> rebuildExcluding(final DAG<IRVertex, IREdge> dag, final Set<IRVertex> excluded) { |
| final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(); |
| dag.getVertices().stream().filter(v -> !excluded.contains(v)).forEach(builder::addVertex); |
| dag.getEdges().stream().filter(e -> !excluded.contains(e.getSrc()) && !excluded.contains(e.getDst())) |
| .forEach(builder::connectVertices); |
| return builder; |
| } |
| |
| /** |
| * Inserts a new vertex that streams data. |
| * <p> |
| * Before: src - edgeToStreamize - dst |
| * After: src - edgeToStreamizeWithNewDestination - relayVertex - oneToOneEdge - dst |
| * (replaces the "Before" relationships) |
| * <p> |
| * This preserves semantics as the relayVertex simply forwards data elements from the input edge to the output edge. |
| * |
| * @param relayVertex to insert. |
| * @param edgeToStreamize to modify. |
| */ |
| public void insert(final RelayVertex relayVertex, final IREdge edgeToStreamize) { |
| assertNonExistence(relayVertex); |
| assertNonControlEdge(edgeToStreamize); |
| |
| // Create a completely new DAG with the vertex inserted. |
| final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(); |
| |
| // Integrity check |
| if (edgeToStreamize.getPropertyValue(MessageIdEdgeProperty.class).isPresent() |
| && !edgeToStreamize.getPropertyValue(MessageIdEdgeProperty.class).get().isEmpty()) { |
| throw new CompileTimeOptimizationException(edgeToStreamize.getId() + " has a MessageId, and cannot be removed"); |
| } |
| |
| // Insert the vertex. |
| final IRVertex vertexToInsert = wrapSamplingVertexIfNeeded(relayVertex, edgeToStreamize.getSrc()); |
| builder.addVertex(vertexToInsert); |
| edgeToStreamize.getSrc().getPropertyValue(ParallelismProperty.class) |
| .ifPresent(p -> vertexToInsert.setProperty(ParallelismProperty.of(p))); |
| |
| // Build the new DAG to reflect the new topology. |
| modifiedDAG.topologicalDo(v -> { |
| builder.addVertex(v); // None of the existing vertices are deleted. |
| |
| for (final IREdge edge : modifiedDAG.getIncomingEdgesOf(v)) { |
| if (edge.equals(edgeToStreamize)) { |
| // MATCH! |
| |
| // Edge to the relayVertex |
| final IREdge toSV = new IREdge( |
| edgeToStreamize.getPropertyValue(CommunicationPatternProperty.class).get(), |
| edgeToStreamize.getSrc(), |
| vertexToInsert); |
| edgeToStreamize.copyExecutionPropertiesTo(toSV); |
| |
| // Edge from the relayVertex. |
| final IREdge fromSV = new IREdge(CommunicationPatternProperty.Value.OneToOne, vertexToInsert, v); |
| fromSV.setProperty(EncoderProperty.of(edgeToStreamize.getPropertyValue(EncoderProperty.class).get())); |
| fromSV.setProperty(DecoderProperty.of(edgeToStreamize.getPropertyValue(DecoderProperty.class).get())); |
| |
| // Annotations for efficient data transfers - toSV |
| toSV.setPropertyPermanently(DecoderProperty.of(BytesDecoderFactory.of())); |
| toSV.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.LZ4)); |
| toSV.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.None)); |
| |
| // Annotations for efficient data transfers - fromSV |
| fromSV.setPropertyPermanently(EncoderProperty.of(BytesEncoderFactory.of())); |
| fromSV.setPropertyPermanently(CompressionProperty.of(CompressionProperty.Value.None)); |
| fromSV.setPropertyPermanently(DecompressionProperty.of(CompressionProperty.Value.LZ4)); |
| fromSV.setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Type.DedicatedKeyPerElement)); |
| |
| // Track the new edges. |
| builder.connectVertices(toSV); |
| builder.connectVertices(fromSV); |
| } else { |
| // NO MATCH, so simply connect vertices as before. |
| builder.connectVertices(edge); |
| } |
| } |
| }); |
| |
| if (edgeToStreamize.getSrc() instanceof RelayVertex) { |
| streamVertexToOriginalEdge.put(relayVertex, streamVertexToOriginalEdge.get(edgeToStreamize.getSrc())); |
| } else if (edgeToStreamize.getDst() instanceof RelayVertex) { |
| streamVertexToOriginalEdge.put(relayVertex, streamVertexToOriginalEdge.get(edgeToStreamize.getDst())); |
| } else { |
| streamVertexToOriginalEdge.put(relayVertex, edgeToStreamize); |
| } |
| modifiedDAG = builder.build(); // update the DAG. |
| } |
| |
| /** |
| * Inserts a new vertex that analyzes intermediate data, and triggers a dynamic optimization. |
| * <p> |
| * For each edge in edgesToGetStatisticsOf... |
| * <p> |
| * Before: src - edge - dst |
| * After: src - oneToOneEdge(a clone of edge) - triggerVertex - |
| * shuffleEdge - messageAggregatorVertex - broadcastEdge - dst |
| * (the "Before" relationships are unmodified) |
| * <p> |
| * This preserves semantics as the results of the inserted message vertices are never consumed by the original IRDAG. |
| * <p> |
| * TODO #345: Simplify insert(TriggerVertex) |
| * |
| * @param triggerVertex to insert. |
| * @param messageAggregatorVertex to insert. |
| * @param triggerOutputEncoder to use. |
| * @param triggerOutputDecoder to use. |
| * @param edgesToGetStatisticsOf to examine. |
| * @param edgesToOptimize to optimize. |
| */ |
| public void insert(final TriggerVertex triggerVertex, |
| final MessageAggregatorVertex messageAggregatorVertex, |
| final EncoderProperty triggerOutputEncoder, |
| final DecoderProperty triggerOutputDecoder, |
| final Set<IREdge> edgesToGetStatisticsOf, |
| final Set<IREdge> edgesToOptimize) { |
| assertNonExistence(triggerVertex); |
| assertNonExistence(messageAggregatorVertex); |
| edgesToGetStatisticsOf.forEach(this::assertNonControlEdge); |
| edgesToOptimize.forEach(this::assertNonControlEdge); |
| |
| if (edgesToGetStatisticsOf.stream().map(edge -> edge.getDst().getId()).collect(Collectors.toSet()).size() != 1) { |
| throw new IllegalArgumentException("Not destined to the same vertex: " + edgesToOptimize.toString()); |
| } |
| if (edgesToOptimize.stream().map(edge -> edge.getDst().getId()).collect(Collectors.toSet()).size() != 1) { |
| throw new IllegalArgumentException("Not destined to the same vertex: " + edgesToOptimize.toString()); |
| } |
| |
| // Create a completely new DAG with the vertex inserted. |
| final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(); |
| |
| // All of the existing vertices and edges remain intact |
| modifiedDAG.topologicalDo(v -> { |
| builder.addVertex(v); |
| modifiedDAG.getIncomingEdgesOf(v).forEach(builder::connectVertices); |
| }); |
| |
| ////////////////////////////////// STEP 1: Insert new vertices and edges (src - trigger - agg - dst) |
| |
| // From src to trigger |
| final List<IRVertex> triggerList = new ArrayList<>(); |
| for (final IREdge edge : edgesToGetStatisticsOf) { |
| final IRVertex triggerToAdd = wrapSamplingVertexIfNeeded( |
| new TriggerVertex<>(triggerVertex.getMessageFunction()), edge.getSrc()); |
| builder.addVertex(triggerToAdd); |
| triggerList.add(triggerToAdd); |
| edge.getSrc().getPropertyValue(ParallelismProperty.class) |
| .ifPresent(p -> triggerToAdd.setProperty(ParallelismProperty.of(p))); |
| |
| final IREdge edgeToClone; |
| if (edge.getSrc() instanceof RelayVertex) { |
| edgeToClone = streamVertexToOriginalEdge.get(edge.getSrc()); |
| } else if (edge.getDst() instanceof RelayVertex) { |
| edgeToClone = streamVertexToOriginalEdge.get(edge.getDst()); |
| } else { |
| edgeToClone = edge; |
| } |
| |
| final IREdge clone = Util.cloneEdge( |
| CommunicationPatternProperty.Value.OneToOne, edgeToClone, edge.getSrc(), triggerToAdd); |
| builder.connectVertices(clone); |
| } |
| |
| // Add agg (no need to wrap inside sampling vertices) |
| builder.addVertex(messageAggregatorVertex); |
| |
| // From trigger to agg |
| for (final IRVertex trigger : triggerList) { |
| final IREdge edgeToMav = edgeToMessageAggregator( |
| trigger, messageAggregatorVertex, triggerOutputEncoder, triggerOutputDecoder); |
| builder.connectVertices(edgeToMav); |
| } |
| |
| // From agg to dst |
| // Add a control dependency (no output) from the messageAggregatorVertex to the destination. |
| builder.connectVertices( |
| Util.createControlEdge(messageAggregatorVertex, edgesToGetStatisticsOf.iterator().next().getDst())); |
| |
| ////////////////////////////////// STEP 2: Annotate the MessageId on optimization target edges |
| |
| modifiedDAG.topologicalDo(v -> { |
| modifiedDAG.getIncomingEdgesOf(v).forEach(inEdge -> { |
| if (edgesToOptimize.contains(inEdge)) { |
| final HashSet<Integer> msgEdgeIds = |
| inEdge.getPropertyValue(MessageIdEdgeProperty.class).orElse(new HashSet<>(0)); |
| msgEdgeIds.add(messageAggregatorVertex.getPropertyValue(MessageIdVertexProperty.class).get()); |
| inEdge.setProperty(MessageIdEdgeProperty.of(msgEdgeIds)); |
| } |
| }); |
| }); |
| |
| final Set<IRVertex> insertedVertices = new HashSet<>(); |
| insertedVertices.addAll(triggerList); |
| insertedVertices.add(messageAggregatorVertex); |
| triggerList.forEach(trigger -> messageVertexToGroup.put(trigger, insertedVertices)); |
| messageVertexToGroup.put(messageAggregatorVertex, insertedVertices); |
| |
| modifiedDAG = builder.build(); // update the DAG. |
| } |
| |
| /** |
| * Inserts a set of samplingVertices that process sampled data. |
| * <p> |
| * This method automatically inserts the following three types of edges. |
| * (1) Edges between samplingVertices to reflect the original relationship |
| * (2) Edges from the original IRDAG to samplingVertices that clone the inEdges of the original vertices |
| * (3) Edges from the samplingVertices to the original IRDAG to respect executeAfterSamplingVertices |
| * <p> |
| * Suppose the caller supplies the following arguments to perform a "sampled run" of vertices {V1, V2}, |
| * prior to executing them. |
| * - samplingVertices: {V1', V2'} |
| * - childrenOfSamplingVertices: {V1} |
| * <p> |
| * Before: V1 - oneToOneEdge - V2 - shuffleEdge - V3 |
| * After: V1' - oneToOneEdge - V2' - controlEdge - V1 - oneToOneEdge - V2 - shuffleEdge - V3 |
| * <p> |
| * This preserves semantics as the original IRDAG remains unchanged and unaffected. |
| * <p> |
| * (Future calls to insert() can add new vertices that connect to sampling vertices. Such new vertices will also be |
| * wrapped with sampling vertices, as new vertices that consume outputs from sampling vertices will process |
| * a subset of data anyways, and no such new vertex will reach the original DAG except via control edges) |
| * <p> |
| * TODO #343: Extend SamplingVertex control edges |
| * |
| * @param toInsert sampling vertices. |
| * @param executeAfter that must be executed after toInsert. |
| */ |
| public void insert(final Set<SamplingVertex> toInsert, |
| final Set<IRVertex> executeAfter) { |
| toInsert.forEach(this::assertNonExistence); |
| executeAfter.forEach(this::assertExistence); |
| |
| // Create a completely new DAG with the vertex inserted. |
| final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>(); |
| |
| // All of the existing vertices and edges remain intact |
| modifiedDAG.topologicalDo(v -> { |
| builder.addVertex(v); |
| modifiedDAG.getIncomingEdgesOf(v).forEach(builder::connectVertices); |
| }); |
| |
| // Add the sampling vertices |
| toInsert.forEach(builder::addVertex); |
| |
| // Get the original vertices |
| final Map<IRVertex, IRVertex> originalToSampling = toInsert.stream() |
| .collect(Collectors.toMap(sv -> modifiedDAG.getVertexById(sv.getOriginalVertexId()), Function.identity())); |
| final Set<IREdge> inEdgesOfOriginals = originalToSampling.keySet() |
| .stream() |
| .flatMap(ov -> modifiedDAG.getIncomingEdgesOf(ov).stream()) |
| .collect(Collectors.toSet()); |
| |
| // [EDGE TYPE 1] Between sampling vertices |
| final Set<IREdge> betweenOriginals = inEdgesOfOriginals |
| .stream() |
| .filter(ovInEdge -> originalToSampling.containsKey(ovInEdge.getSrc())) |
| .collect(Collectors.toSet()); |
| betweenOriginals.stream().map(boEdge -> Util.cloneEdge( |
| boEdge, |
| originalToSampling.get(boEdge.getSrc()), |
| originalToSampling.get(boEdge.getDst()))).forEach(builder::connectVertices); |
| |
| // [EDGE TYPE 2] From original IRDAG to sampling vertices |
| final Set<IREdge> notBetweenOriginals = inEdgesOfOriginals |
| .stream() |
| .filter(ovInEdge -> !originalToSampling.containsKey(ovInEdge.getSrc())) |
| .collect(Collectors.toSet()); |
| notBetweenOriginals.stream().map(nboEdge -> { |
| final IREdge cloneEdge = Util.cloneEdge( |
| nboEdge, |
| nboEdge.getSrc(), // sampling vertices consume a subset of original data partitions here |
| originalToSampling.get(nboEdge.getDst())); |
| nboEdge.copyExecutionPropertiesTo(cloneEdge); // exec properties must be exactly the same |
| return cloneEdge; |
| }).forEach(builder::connectVertices); |
| |
| // [EDGE TYPE 3] From sampling vertices to vertices that should be executed after |
| final Set<IRVertex> sinks = getSinksWithinVertexSet(modifiedDAG, originalToSampling.keySet()) |
| .stream() |
| .map(originalToSampling::get) |
| .collect(Collectors.toSet()); |
| for (final IRVertex ea : executeAfter) { |
| for (final IRVertex sink : sinks) { |
| // Control edge that enforces execution ordering |
| builder.connectVertices(Util.createControlEdge(sink, ea)); |
| } |
| } |
| |
| toInsert.forEach(tiv -> samplingVertexToGroup.put(tiv, toInsert)); |
| modifiedDAG = builder.build(); // update the DAG. |
| } |
| |
| /** |
| * Reshape unsafely, without guarantees on preserving application semantics. |
| * TODO #330: Refactor Unsafe Reshaping Passes |
| * |
| * @param unsafeReshapingFunction takes as input the underlying DAG, and outputs a reshaped DAG. |
| */ |
| public void reshapeUnsafely(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge>> unsafeReshapingFunction) { |
| modifiedDAG = unsafeReshapingFunction.apply(modifiedDAG); |
| } |
| |
| ////////////////////////////////////////////////// Private helper methods. |
| |
| private Set<IRVertex> getSinksWithinVertexSet(final DAG<IRVertex, IREdge> dag, |
| final Set<IRVertex> vertexSet) { |
| final Set<IRVertex> parentsOfAnotherVertex = vertexSet.stream() |
| .flatMap(v -> dag.getOutgoingEdgesOf(v).stream()) |
| .filter(e -> vertexSet.contains(e.getDst())) |
| .map(IREdge::getSrc) // makes the result a subset of the input vertexSet |
| .collect(Collectors.toSet()); |
| return Sets.difference(vertexSet, parentsOfAnotherVertex); |
| } |
| |
| private IRVertex wrapSamplingVertexIfNeeded(final IRVertex newVertex, final IRVertex existingVertexToConnectWith) { |
| // If the connecting vertex is a sampling vertex, the new vertex must be wrapped inside a sampling vertex too. |
| return existingVertexToConnectWith instanceof SamplingVertex |
| ? new SamplingVertex(newVertex, ((SamplingVertex) existingVertexToConnectWith).getDesiredSampleRate()) |
| : newVertex; |
| } |
| |
| private void assertNonControlEdge(final IREdge e) { |
| if (Util.isControlEdge(e)) { |
| throw new IllegalArgumentException(e.getId()); |
| } |
| } |
| |
| private void assertExistence(final IRVertex v) { |
| if (!getVertices().contains(v)) { |
| throw new IllegalArgumentException(v.getId()); |
| } |
| } |
| |
| private void assertNonExistence(final IRVertex v) { |
| if (getVertices().contains(v)) { |
| throw new IllegalArgumentException(v.getId()); |
| } |
| } |
| |
| /** |
| * @param trigger src. |
| * @param agg dst. |
| * @param encoder src-dst encoder. |
| * @param decoder src-dst decoder. |
| * @return the edge. |
| */ |
| private IREdge edgeToMessageAggregator(final IRVertex trigger, |
| final IRVertex agg, |
| final EncoderProperty encoder, |
| final DecoderProperty decoder) { |
| final IREdge newEdge = new IREdge(CommunicationPatternProperty.Value.Shuffle, trigger, agg); |
| newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore)); |
| newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep)); |
| newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push)); |
| newEdge.setPropertyPermanently(encoder); |
| newEdge.setPropertyPermanently(decoder); |
| newEdge.setPropertyPermanently(KeyExtractorProperty.of(new PairKeyExtractor())); |
| |
| // TODO #345: Simplify insert(TriggerVertex) |
| // these are obviously wrong, but hacks for now... |
| newEdge.setPropertyPermanently(KeyEncoderProperty.of(encoder.getValue())); |
| newEdge.setPropertyPermanently(KeyDecoderProperty.of(decoder.getValue())); |
| |
| return newEdge; |
| } |
| |
| ////////////////////////////////////////////////// DAGInterface methods - forward calls to the underlying DAG. |
| |
| @Override |
| public void topologicalDo(final Consumer<IRVertex> function) { |
| modifiedDAG.topologicalDo(function); |
| } |
| |
| @Override |
| public void dfsTraverse(final Consumer<IRVertex> function, final TraversalOrder traversalOrder) { |
| modifiedDAG.dfsTraverse(function, traversalOrder); |
| } |
| |
| @Override |
| public void dfsDo(final IRVertex vertex, |
| final Consumer<IRVertex> vertexConsumer, |
| final TraversalOrder traversalOrder, |
| final Set<IRVertex> visited) { |
| modifiedDAG.dfsDo(vertex, vertexConsumer, traversalOrder, visited); |
| } |
| |
| @Override |
| public Boolean pathExistsBetween(final IRVertex v1, final IRVertex v2) { |
| return modifiedDAG.pathExistsBetween(v1, v2); |
| } |
| |
| @Override |
| public Boolean isCompositeVertex(final IRVertex irVertex) { |
| return modifiedDAG.isCompositeVertex(irVertex); |
| } |
| |
| @Override |
| public Integer getLoopStackDepthOf(final IRVertex irVertex) { |
| return modifiedDAG.getLoopStackDepthOf(irVertex); |
| } |
| |
| @Override |
| public LoopVertex getAssignedLoopVertexOf(final IRVertex irVertex) { |
| return modifiedDAG.getAssignedLoopVertexOf(irVertex); |
| } |
| |
| @Override |
| public ObjectNode asJsonNode() { |
| return modifiedDAG.asJsonNode(); |
| } |
| |
| @Override |
| public void storeJSON(final String directory, final String name, final String description) { |
| modifiedDAG.storeJSON(directory, name, description); |
| } |
| |
| @Override |
| public IRVertex getVertexById(final String id) { |
| return modifiedDAG.getVertexById(id); |
| } |
| |
| @Override |
| public IREdge getEdgeById(final String id) { |
| return modifiedDAG.getEdgeById(id); |
| } |
| |
| @Override |
| public List<IRVertex> getVertices() { |
| return modifiedDAG.getVertices(); |
| } |
| |
| @Override |
| public List<IREdge> getEdges() { |
| return modifiedDAG.getEdges(); |
| } |
| |
| @Override |
| public List<IRVertex> getRootVertices() { |
| return modifiedDAG.getRootVertices(); |
| } |
| |
| @Override |
| public List<IREdge> getIncomingEdgesOf(final IRVertex v) { |
| return modifiedDAG.getIncomingEdgesOf(v); |
| } |
| |
| @Override |
| public List<IREdge> getIncomingEdgesOf(final String vertexId) { |
| return modifiedDAG.getIncomingEdgesOf(vertexId); |
| } |
| |
| @Override |
| public List<IREdge> getOutgoingEdgesOf(final IRVertex v) { |
| return modifiedDAG.getOutgoingEdgesOf(v); |
| } |
| |
| @Override |
| public List<IREdge> getOutgoingEdgesOf(final String vertexId) { |
| return modifiedDAG.getOutgoingEdgesOf(vertexId); |
| } |
| |
| @Override |
| public List<IRVertex> getParents(final String vertexId) { |
| return modifiedDAG.getParents(vertexId); |
| } |
| |
| @Override |
| public List<IRVertex> getChildren(final String vertexId) { |
| return modifiedDAG.getChildren(vertexId); |
| } |
| |
| @Override |
| public IREdge getEdgeBetween(final String srcVertexId, |
| final String dstVertexId) throws IllegalEdgeOperationException { |
| return modifiedDAG.getEdgeBetween(srcVertexId, dstVertexId); |
| } |
| |
| @Override |
| public List<IRVertex> getTopologicalSort() { |
| return modifiedDAG.getTopologicalSort(); |
| } |
| |
| @Override |
| public List<IRVertex> getAncestors(final String vertexId) { |
| return modifiedDAG.getAncestors(vertexId); |
| } |
| |
| @Override |
| public List<IRVertex> getDescendants(final String vertexId) { |
| return modifiedDAG.getDescendants(vertexId); |
| } |
| |
| @Override |
| public List<IRVertex> filterVertices(final Predicate<IRVertex> condition) { |
| return modifiedDAG.filterVertices(condition); |
| } |
| |
| @Override |
| public String toString() { |
| return asJsonNode().toString(); |
| } |
| } |