blob: b89f2417ba5235620bf5aa2e5af9643506fff2db [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.runtime.master;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.dag.DAG;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.ir.IdManager;
import org.apache.nemo.common.ir.edge.executionproperty.CacheIDProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue;
import org.apache.nemo.common.ir.vertex.CachedSourceVertex;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.common.ir.vertex.executionproperty.IgnoreSchedulingTempDataReceiverProperty;
import org.apache.nemo.runtime.common.exception.PlanAppenderException;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.RuntimeEdge;
import org.apache.nemo.runtime.common.plan.Stage;
import org.apache.nemo.runtime.common.plan.StageEdge;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/**
* An implementation of plan appender which append two plans regarding to caching.
*/
public final class PlanAppender {
/**
* Private constructor.
*/
private PlanAppender() {
// Private constructor.
}
/**
* Append to plans regarding to caching.
* For more information about caching part, check {@link IgnoreSchedulingTempDataReceiverProperty}.
*
* @param originalPlan the original plan.
* @param planToAppend the plan to append.
* @return the appended plan.
*/
public static PhysicalPlan appendPlan(final PhysicalPlan originalPlan,
final PhysicalPlan planToAppend) {
// Add the stage DAG of the original plan to the builder at first.
final DAGBuilder<Stage, StageEdge> physicalDAGBuilder = new DAGBuilder<>(originalPlan.getStageDAG());
// Scan cached data in the original plan.
final Map<UUID, StageEdge> cachedEdges = new HashMap<>();
originalPlan.getStageDAG().getVertices().forEach(
stage -> originalPlan.getStageDAG().getIncomingEdgesOf(stage).stream()
// Cached edge toward a temporary data receiver is a representative edge.
.filter(stageEdge ->
stageEdge.getDstIRVertex().getPropertyValue(IgnoreSchedulingTempDataReceiverProperty.class).isPresent())
.forEach(stageEdge -> stageEdge.getPropertyValue(CacheIDProperty.class)
.ifPresent(cacheId -> cachedEdges.put(cacheId, stageEdge))
));
// Scan CacheID to a pair of cached source vertex and it's stage from the plan to append.
final Map<UUID, Pair<IRVertex, Stage>> cacheCandidates = new HashMap<>();
final DAG<Stage, StageEdge> dagToAppend = planToAppend.getStageDAG();
dagToAppend.topologicalDo(stage -> {
// Add the stage DAG of the plan to append to the builder.
physicalDAGBuilder.addVertex(stage);
dagToAppend.getIncomingEdgesOf(stage).
forEach(edge -> {
physicalDAGBuilder.connectVertices(edge);
// Find cached-data requiring stage edges in the submitted plan.
if (edge.getSrcIRVertex() instanceof CachedSourceVertex) {
final UUID cacheId = edge.getPropertyValue(CacheIDProperty.class)
.orElseThrow(() -> new PlanAppenderException("No cache id in the cached edge " + edge.getId()));
cacheCandidates.put(cacheId, Pair.of(edge.getSrcIRVertex(), edge.getSrc()));
}
});
// Find cached-data requiring ir edges in the submitted plan.
final DAG<IRVertex, RuntimeEdge<IRVertex>> stageIRDAG = stage.getIRDAG();
stageIRDAG.getVertices().stream()
.filter(irVertex -> irVertex instanceof CachedSourceVertex)
.forEach(cachedSourceVertex ->
stageIRDAG.getOutgoingEdgesOf(cachedSourceVertex).forEach(runtimeEdge -> {
final UUID cacheId = runtimeEdge.getPropertyValue(CacheIDProperty.class)
.orElseThrow(
() -> new PlanAppenderException("No cache id in the cached edge " + runtimeEdge.getId()));
cacheCandidates.put(cacheId, Pair.of(runtimeEdge.getSrc(), stage));
}));
});
// Link the cached data and the stages require the data.
cacheCandidates.forEach((cacheId, vertexStagePair) -> {
final StageEdge cachedEdge = cachedEdges.get(cacheId);
if (cachedEdge != null) {
final StageEdge newEdge = new StageEdge(
IdManager.newEdgeId(),
cachedEdge.getExecutionProperties(),
cachedEdge.getSrcIRVertex(),
vertexStagePair.left(),
cachedEdge.getSrc(),
vertexStagePair.right());
physicalDAGBuilder.connectVertices(newEdge);
final DuplicateEdgeGroupPropertyValue duplicateEdgeGroupPropertyValue =
cachedEdge.getPropertyValue(DuplicateEdgeGroupProperty.class)
.orElseThrow(() -> new PlanAppenderException("Cached edge does not have duplicated edge group property."));
duplicateEdgeGroupPropertyValue.setGroupSize(duplicateEdgeGroupPropertyValue.getGroupSize() + 1);
newEdge.getExecutionProperties().put(DuplicateEdgeGroupProperty.of(duplicateEdgeGroupPropertyValue));
} else {
throw new PlanAppenderException("Cached edge is not found in the original plan.");
}
});
return new PhysicalPlan(originalPlan.getPlanId(), physicalDAGBuilder.build());
}
}