| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.nemo.runtime.common.plan; |
| |
| import org.apache.nemo.common.ir.Readable; |
| import org.apache.nemo.common.ir.edge.executionproperty.DataFlowProperty; |
| import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty; |
| import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue; |
| import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap; |
| import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty; |
| import org.apache.nemo.common.ir.vertex.*; |
| import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty; |
| import org.apache.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty; |
| import org.apache.nemo.conf.JobConf; |
| import org.apache.nemo.common.dag.DAG; |
| import org.apache.nemo.common.dag.DAGBuilder; |
| import org.apache.nemo.common.ir.edge.IREdge; |
| import org.apache.nemo.common.exception.IllegalVertexOperationException; |
| import org.apache.nemo.common.exception.PhysicalPlanGenerationException; |
| import org.apache.nemo.runtime.common.RuntimeIdManager; |
| import org.apache.commons.lang3.mutable.MutableInt; |
| import org.apache.reef.tang.annotations.Parameter; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.inject.Inject; |
| import java.util.*; |
| import java.util.function.Function; |
| |
| /** |
| * A function that converts an IR DAG to physical DAG. |
| */ |
| public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdge>, DAG<Stage, StageEdge>> { |
| private final String dagDirectory; |
| private final StagePartitioner stagePartitioner; |
| private static final Logger LOG = LoggerFactory.getLogger(PhysicalPlanGenerator.class.getName()); |
| |
| /** |
| * Private constructor. |
| * |
| * @param stagePartitioner provides stage partitioning |
| * @param dagDirectory the directory in which to store DAG data. |
| */ |
| @Inject |
| private PhysicalPlanGenerator(final StagePartitioner stagePartitioner, |
| @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) { |
| this.dagDirectory = dagDirectory; |
| this.stagePartitioner = stagePartitioner; |
| } |
| |
| /** |
| * Generates the {@link PhysicalPlan} to be executed. |
| * |
| * @param irDAG that should be converted to a physical execution plan |
| * @return {@link PhysicalPlan} to execute. |
| */ |
| @Override |
| public DAG<Stage, StageEdge> apply(final DAG<IRVertex, IREdge> irDAG) { |
| // first, stage-partition the IR DAG. |
| final DAG<Stage, StageEdge> dagOfStages = stagePartitionIrDAG(irDAG); |
| |
| // Sanity check |
| dagOfStages.getVertices().forEach(this::integrityCheck); |
| |
| // this is needed because of DuplicateEdgeGroupProperty. |
| handleDuplicateEdgeGroupProperty(dagOfStages); |
| |
| // Split StageGroup by Pull StageEdges |
| splitScheduleGroupByPullStageEdges(dagOfStages); |
| |
| // for debugging purposes. |
| dagOfStages.storeJSON(dagDirectory, "plan-logical", "logical execution plan"); |
| |
| return dagOfStages; |
| } |
| |
| /** |
| * Convert the edge id of DuplicateEdgeGroupProperty to physical edge id. |
| * |
| * @param dagOfStages dag to manipulate |
| */ |
| private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfStages) { |
| final Map<String, List<StageEdge>> edgeGroupToIrEdge = new HashMap<>(); |
| |
| dagOfStages.topologicalDo(irVertex -> dagOfStages.getIncomingEdgesOf(irVertex).forEach(e -> { |
| final Optional<DuplicateEdgeGroupPropertyValue> duplicateEdgeGroupProperty = |
| e.getPropertyValue(DuplicateEdgeGroupProperty.class); |
| if (duplicateEdgeGroupProperty.isPresent()) { |
| final String duplicateGroupId = duplicateEdgeGroupProperty.get().getGroupId(); |
| edgeGroupToIrEdge.computeIfAbsent(duplicateGroupId, k -> new ArrayList<>()).add(e); |
| } |
| })); |
| |
| edgeGroupToIrEdge.forEach((id, edges) -> { |
| final StageEdge firstEdge = edges.get(0); |
| final DuplicateEdgeGroupPropertyValue firstDuplicateEdgeValue = |
| firstEdge.getPropertyValue(DuplicateEdgeGroupProperty.class).get(); |
| |
| edges.forEach(e -> { |
| final DuplicateEdgeGroupPropertyValue duplicateEdgeGroupProperty = |
| e.getPropertyValue(DuplicateEdgeGroupProperty.class).get(); |
| if (firstDuplicateEdgeValue.isRepresentativeEdgeDecided()) { |
| duplicateEdgeGroupProperty.setRepresentativeEdgeId(firstDuplicateEdgeValue.getRepresentativeEdgeId()); |
| } else { |
| duplicateEdgeGroupProperty.setRepresentativeEdgeId(firstEdge.getId()); |
| } |
| duplicateEdgeGroupProperty.setGroupSize(edges.size()); |
| }); |
| }); |
| } |
| |
| /** |
| * We take the stage-partitioned DAG and create actual stage and stage edge objects to create a DAG of stages. |
| * |
| * @param irDAG stage-partitioned IR DAG. |
| * @return the DAG composed of stages and stage edges. |
| */ |
| public DAG<Stage, StageEdge> stagePartitionIrDAG(final DAG<IRVertex, IREdge> irDAG) { |
| final DAGBuilder<Stage, StageEdge> dagOfStagesBuilder = new DAGBuilder<>(); |
| final Set<IREdge> interStageEdges = new HashSet<>(); |
| final Map<Integer, Stage> stageIdToStageMap = new HashMap<>(); |
| final Map<IRVertex, Integer> vertexToStageIdMap = stagePartitioner.apply(irDAG); |
| |
| final Map<Integer, Set<IRVertex>> vertexSetForEachStage = new LinkedHashMap<>(); |
| irDAG.topologicalDo(irVertex -> { |
| final int stageId = vertexToStageIdMap.get(irVertex); |
| if (!vertexSetForEachStage.containsKey(stageId)) { |
| vertexSetForEachStage.put(stageId, new HashSet<>()); |
| } |
| vertexSetForEachStage.get(stageId).add(irVertex); |
| }); |
| |
| for (final int stageId : vertexSetForEachStage.keySet()) { |
| final Set<IRVertex> stageVertices = vertexSetForEachStage.get(stageId); |
| final String stageIdentifier = RuntimeIdManager.generateStageId(stageId); |
| final ExecutionPropertyMap<VertexExecutionProperty> stageProperties = new ExecutionPropertyMap<>(stageIdentifier); |
| stagePartitioner.getStageProperties(stageVertices.iterator().next()).forEach(stageProperties::put); |
| final int stageParallelism = stageProperties.get(ParallelismProperty.class) |
| .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage")); |
| |
| final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>(); |
| |
| // Prepare vertexIdToReadables |
| final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism); |
| for (int i = 0; i < stageParallelism; i++) { |
| vertexIdToReadables.add(new HashMap<>()); |
| } |
| |
| // For each IRVertex, |
| for (final IRVertex irVertex : stageVertices) { |
| // Take care of the readables of a source vertex. |
| if (irVertex instanceof SourceVertex && !irVertex.getStagePartitioned()) { |
| final SourceVertex sourceVertex = (SourceVertex) irVertex; |
| try { |
| final List<Readable> readables = sourceVertex.getReadables(stageParallelism); |
| for (int i = 0; i < stageParallelism; i++) { |
| vertexIdToReadables.get(i).put(irVertex.getId(), readables.get(i)); |
| } |
| } catch (final Exception e) { |
| throw new PhysicalPlanGenerationException(e); |
| } |
| // Clear internal metadata. |
| sourceVertex.clearInternalStates(); |
| } |
| |
| // Add vertex to the stage. |
| stageInternalDAGBuilder.addVertex(irVertex); |
| } |
| |
| for (final IRVertex dstVertex : stageVertices) { |
| // Connect all the incoming edges for the vertex. |
| irDAG.getIncomingEdgesOf(dstVertex).forEach(irEdge -> { |
| final IRVertex srcVertex = irEdge.getSrc(); |
| |
| // both vertices are in the same stage. |
| if (vertexToStageIdMap.get(srcVertex).equals(vertexToStageIdMap.get(dstVertex))) { |
| stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>( |
| irEdge.getId(), |
| irEdge.getExecutionProperties(), |
| irEdge.getSrc(), |
| irEdge.getDst())); |
| } else { // edge comes from another stage |
| interStageEdges.add(irEdge); |
| } |
| }); |
| } |
| // If this runtime stage contains at least one vertex, build it! |
| if (!stageInternalDAGBuilder.isEmpty()) { |
| final DAG<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAG |
| = stageInternalDAGBuilder.buildWithoutSourceSinkCheck(); |
| final Stage stage = new Stage(stageIdentifier, stageInternalDAG, stageProperties, vertexIdToReadables); |
| dagOfStagesBuilder.addVertex(stage); |
| stageIdToStageMap.put(stageId, stage); |
| } |
| |
| // To prevent re-fetching readables in source vertex |
| // during re-generation of physical plan for dynamic optimization. |
| for (IRVertex irVertex : stageVertices) { |
| irVertex.setStagePartitioned(); |
| } |
| } |
| |
| // Add StageEdges |
| for (final IREdge interStageEdge : interStageEdges) { |
| final Stage srcStage = stageIdToStageMap.get(vertexToStageIdMap.get(interStageEdge.getSrc())); |
| final Stage dstStage = stageIdToStageMap.get(vertexToStageIdMap.get(interStageEdge.getDst())); |
| if (srcStage == null || dstStage == null) { |
| throw new IllegalVertexOperationException(String.format("Stage not added to the builder:%s%s", |
| srcStage == null ? String.format(" source stage for %s", interStageEdge.getSrc()) : "", |
| dstStage == null ? String.format(" destination stage for %s", interStageEdge.getDst()) : "")); |
| } |
| dagOfStagesBuilder.connectVertices(new StageEdge(interStageEdge.getId(), interStageEdge.getExecutionProperties(), |
| interStageEdge.getSrc(), interStageEdge.getDst(), srcStage, dstStage)); |
| } |
| |
| return dagOfStagesBuilder.build(); |
| } |
| |
| /** |
| * Integrity check for Stage. |
| * @param stage to check for |
| */ |
| private void integrityCheck(final Stage stage) { |
| stage.getPropertyValue(ParallelismProperty.class) |
| .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage")); |
| stage.getPropertyValue(ScheduleGroupProperty.class) |
| .orElseThrow(() -> new RuntimeException("ScheduleGroup property must be set for Stage")); |
| |
| stage.getIRDAG().getVertices().forEach(irVertex -> { |
| // Check vertex type. |
| if (!(irVertex instanceof SourceVertex |
| || irVertex instanceof OperatorVertex)) { |
| throw new UnsupportedOperationException(irVertex.toString()); |
| } |
| }); |
| } |
| |
| /** |
| * Split ScheduleGroups by Pull {@link StageEdge}s, and ensure topological ordering of |
| * {@link ScheduleGroupProperty}. |
| * |
| * @param dag {@link DAG} of {@link Stage}s to manipulate |
| */ |
| private void splitScheduleGroupByPullStageEdges(final DAG<Stage, StageEdge> dag) { |
| final MutableInt nextScheduleGroup = new MutableInt(0); |
| final Map<Stage, Integer> stageToScheduleGroupMap = new HashMap<>(); |
| |
| dag.topologicalDo(currentStage -> { |
| // Base case: assign New ScheduleGroup of the Stage |
| stageToScheduleGroupMap.computeIfAbsent(currentStage, s -> getAndIncrement(nextScheduleGroup)); |
| |
| for (final StageEdge stageEdgeFromCurrentStage : dag.getOutgoingEdgesOf(currentStage)) { |
| final Stage destination = stageEdgeFromCurrentStage.getDst(); |
| // Skip if some Stages that destination depends on do not have assigned new ScheduleGroup |
| boolean skip = false; |
| for (final StageEdge stageEdgeToDestination : dag.getIncomingEdgesOf(destination)) { |
| if (!stageToScheduleGroupMap.containsKey(stageEdgeToDestination.getSrc())) { |
| skip = true; |
| break; |
| } |
| } |
| if (skip) { |
| continue; |
| } |
| if (stageToScheduleGroupMap.containsKey(destination)) { |
| continue; |
| } |
| |
| // Find any non-pull inEdge |
| Integer scheduleGroup = null; |
| Integer newScheduleGroup = null; |
| for (final StageEdge stageEdge : dag.getIncomingEdgesOf(destination)) { |
| final Stage source = stageEdge.getSrc(); |
| if (stageEdge.getDataFlowModel() != DataFlowProperty.Value.Pull) { |
| if (scheduleGroup != null && source.getScheduleGroup() != scheduleGroup) { |
| throw new RuntimeException(String.format("Multiple Push inEdges from different ScheduleGroup: %d, %d", |
| scheduleGroup, source.getScheduleGroup())); |
| } |
| if (source.getScheduleGroup() != destination.getScheduleGroup()) { |
| throw new RuntimeException(String.format("Split ScheduleGroup by push StageEdge: %d, %d", |
| source.getScheduleGroup(), destination.getScheduleGroup())); |
| } |
| scheduleGroup = source.getScheduleGroup(); |
| newScheduleGroup = stageToScheduleGroupMap.get(source); |
| } |
| } |
| |
| if (newScheduleGroup == null) { |
| stageToScheduleGroupMap.put(destination, getAndIncrement(nextScheduleGroup)); |
| } else { |
| stageToScheduleGroupMap.put(destination, newScheduleGroup); |
| } |
| } |
| }); |
| |
| dag.topologicalDo(stage -> { |
| final int scheduleGroup = stageToScheduleGroupMap.get(stage); |
| stage.getExecutionProperties().put(ScheduleGroupProperty.of(scheduleGroup)); |
| stage.getIRDAG().topologicalDo(vertex -> vertex.setProperty(ScheduleGroupProperty.of(scheduleGroup))); |
| }); |
| } |
| |
| private static int getAndIncrement(final MutableInt mutableInt) { |
| final int toReturn = mutableInt.getValue(); |
| mutableInt.increment(); |
| return toReturn; |
| } |
| } |