| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.nemo.runtime.common.plan; |
| |
| import org.apache.nemo.common.dag.DAG; |
| import org.apache.nemo.common.dag.DAGBuilder; |
| import org.apache.nemo.common.exception.IllegalVertexOperationException; |
| import org.apache.nemo.common.exception.PhysicalPlanGenerationException; |
| import org.apache.nemo.common.ir.IRDAG; |
| import org.apache.nemo.common.ir.Readable; |
| import org.apache.nemo.common.ir.edge.IREdge; |
| import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty; |
| import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupPropertyValue; |
| import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap; |
| import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty; |
| import org.apache.nemo.common.ir.vertex.IRVertex; |
| import org.apache.nemo.common.ir.vertex.OperatorVertex; |
| import org.apache.nemo.common.ir.vertex.SourceVertex; |
| import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty; |
| import org.apache.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty; |
| import org.apache.nemo.common.ir.vertex.utility.SamplingVertex; |
| import org.apache.nemo.conf.JobConf; |
| import org.apache.nemo.runtime.common.RuntimeIdManager; |
| import org.apache.reef.tang.annotations.Parameter; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import javax.inject.Inject; |
| import java.util.*; |
| import java.util.function.BinaryOperator; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| |
| /** |
| * A function that converts an IR DAG to physical DAG. |
| */ |
| public final class PhysicalPlanGenerator implements Function<IRDAG, DAG<Stage, StageEdge>> { |
| private static final Logger LOG = LoggerFactory.getLogger(PhysicalPlanGenerator.class.getName()); |
| |
| private final String dagDirectory; |
| |
| /** |
| * Private constructor. |
| * |
| * @param dagDirectory the directory in which to store DAG data. |
| */ |
| @Inject |
| private PhysicalPlanGenerator(@Parameter(JobConf.DAGDirectory.class) final String dagDirectory) { |
| this.dagDirectory = dagDirectory; |
| } |
| |
| /** |
| * Generates the {@link PhysicalPlan} to be executed. |
| * |
| * @param irDAG that should be converted to a physical execution plan |
| * @return {@link PhysicalPlan} to execute. |
| */ |
| @Override |
| public DAG<Stage, StageEdge> apply(final IRDAG irDAG) { |
| // first, stage-partition the IR DAG. |
| final DAG<Stage, StageEdge> dagOfStages = stagePartitionIrDAG(irDAG); |
| |
| // Sanity check |
| dagOfStages.getVertices().forEach(this::integrityCheck); |
| |
| // this is needed because of DuplicateEdgeGroupProperty. |
| handleDuplicateEdgeGroupProperty(dagOfStages); |
| |
| // for debugging purposes. |
| dagOfStages.storeJSON(dagDirectory, "plan-logical", "logical execution plan"); |
| |
| return dagOfStages; |
| } |
| |
| /** |
| * Convert the edge id of DuplicateEdgeGroupProperty to physical edge id. |
| * |
| * @param dagOfStages dag to manipulate |
| */ |
| private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfStages) { |
| final Map<String, List<StageEdge>> edgeGroupToIrEdge = new HashMap<>(); |
| |
| dagOfStages.topologicalDo(irVertex -> dagOfStages.getIncomingEdgesOf(irVertex).forEach(e -> { |
| final Optional<DuplicateEdgeGroupPropertyValue> duplicateEdgeGroupProperty = |
| e.getPropertyValue(DuplicateEdgeGroupProperty.class); |
| if (duplicateEdgeGroupProperty.isPresent()) { |
| final String duplicateGroupId = duplicateEdgeGroupProperty.get().getGroupId(); |
| edgeGroupToIrEdge.computeIfAbsent(duplicateGroupId, k -> new ArrayList<>()).add(e); |
| } |
| })); |
| |
| edgeGroupToIrEdge.forEach((id, edges) -> { |
| final StageEdge firstEdge = edges.get(0); |
| final DuplicateEdgeGroupPropertyValue firstDuplicateEdgeValue = |
| firstEdge.getPropertyValue(DuplicateEdgeGroupProperty.class).get(); |
| |
| edges.forEach(e -> { |
| final DuplicateEdgeGroupPropertyValue duplicateEdgeGroupProperty = |
| e.getPropertyValue(DuplicateEdgeGroupProperty.class).get(); |
| if (firstDuplicateEdgeValue.isRepresentativeEdgeDecided()) { |
| duplicateEdgeGroupProperty.setRepresentativeEdgeId(firstDuplicateEdgeValue.getRepresentativeEdgeId()); |
| } else { |
| duplicateEdgeGroupProperty.setRepresentativeEdgeId(firstEdge.getId()); |
| } |
| duplicateEdgeGroupProperty.setGroupSize(edges.size()); |
| }); |
| }); |
| } |
| |
| /** |
| * We take the stage-partitioned DAG and create actual stage and stage edge objects to create a DAG of stages. |
| * |
| * @param irDAG stage-partitioned IR DAG. |
| * @return the DAG composed of stages and stage edges. |
| */ |
| public DAG<Stage, StageEdge> stagePartitionIrDAG(final IRDAG irDAG) { |
| final StagePartitioner stagePartitioner = new StagePartitioner(); |
| final DAGBuilder<Stage, StageEdge> dagOfStagesBuilder = new DAGBuilder<>(); |
| final Set<IREdge> interStageEdges = new HashSet<>(); |
| final Map<Integer, Stage> stageIdToStageMap = new HashMap<>(); |
| final Map<IRVertex, Integer> vertexToStageIdMap = stagePartitioner.apply(irDAG); |
| final HashSet<IRVertex> isStagePartitioned = new HashSet<>(); |
| final Random random = new Random(hashCode()); // to produce same results for same input IRDAGs |
| |
| final Map<Integer, Set<IRVertex>> vertexSetForEachStage = new LinkedHashMap<>(); |
| irDAG.topologicalDo(irVertex -> { |
| final int stageId = vertexToStageIdMap.get(irVertex); |
| if (!vertexSetForEachStage.containsKey(stageId)) { |
| vertexSetForEachStage.put(stageId, new HashSet<>()); |
| } |
| vertexSetForEachStage.get(stageId).add(irVertex); |
| }); |
| |
| for (final int stageId : vertexSetForEachStage.keySet()) { |
| final Set<IRVertex> stageVertices = vertexSetForEachStage.get(stageId); |
| final String stageIdentifier = RuntimeIdManager.generateStageId(stageId); |
| final ExecutionPropertyMap<VertexExecutionProperty> stageProperties = new ExecutionPropertyMap<>(stageIdentifier); |
| stagePartitioner.getStageProperties(stageVertices.iterator().next()).forEach(stageProperties::put); |
| final int stageParallelism = stageProperties.get(ParallelismProperty.class) |
| .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage")); |
| final List<Integer> taskIndices = getTaskIndicesToExecute(stageVertices, stageParallelism, random); |
| final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>(); |
| |
| // Prepare vertexIdToReadables |
| final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism); |
| for (int i = 0; i < stageParallelism; i++) { |
| vertexIdToReadables.add(new HashMap<>()); |
| } |
| |
| // For each IRVertex, |
| for (final IRVertex v : stageVertices) { |
| final IRVertex vertexToPutIntoStage = getActualVertexToPutIntoStage(v); |
| |
| // Take care of the readables of a source vertex. |
| if (vertexToPutIntoStage instanceof SourceVertex && !isStagePartitioned.contains(vertexToPutIntoStage)) { |
| final SourceVertex sourceVertex = (SourceVertex) vertexToPutIntoStage; |
| try { |
| final List<Readable> readables = sourceVertex.getReadables(stageParallelism); |
| for (int i = 0; i < stageParallelism; i++) { |
| vertexIdToReadables.get(i).put(vertexToPutIntoStage.getId(), readables.get(i)); |
| } |
| } catch (final Exception e) { |
| throw new PhysicalPlanGenerationException(e); |
| } |
| // Clear internal metadata. |
| sourceVertex.clearInternalStates(); |
| } |
| |
| // Add vertex to the stage. |
| stageInternalDAGBuilder.addVertex(vertexToPutIntoStage); |
| } |
| |
| for (final IRVertex dstVertex : stageVertices) { |
| // Connect all the incoming edges for the vertex. |
| irDAG.getIncomingEdgesOf(dstVertex).forEach(irEdge -> { |
| final IRVertex srcVertex = irEdge.getSrc(); |
| |
| // both vertices are in the same stage. |
| if (vertexToStageIdMap.get(srcVertex).equals(vertexToStageIdMap.get(dstVertex))) { |
| stageInternalDAGBuilder.connectVertices(new RuntimeEdge<>( |
| irEdge.getId(), |
| irEdge.getExecutionProperties(), |
| getActualVertexToPutIntoStage(irEdge.getSrc()), |
| getActualVertexToPutIntoStage(irEdge.getDst()))); |
| } else { // edge comes from another stage |
| interStageEdges.add(irEdge); |
| } |
| }); |
| } |
| // If this runtime stage contains at least one vertex, build it! |
| if (!stageInternalDAGBuilder.isEmpty()) { |
| final DAG<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAG |
| = stageInternalDAGBuilder.buildWithoutSourceSinkCheck(); |
| final Stage stage = new Stage( |
| stageIdentifier, |
| taskIndices, |
| stageInternalDAG, |
| stageProperties, |
| vertexIdToReadables); |
| dagOfStagesBuilder.addVertex(stage); |
| stageIdToStageMap.put(stageId, stage); |
| } |
| |
| // To prevent re-fetching readables in source vertex |
| // during re-generation of physical plan for dynamic optimization. |
| isStagePartitioned.addAll(stageVertices); |
| } |
| |
| // Add StageEdges |
| for (final IREdge interStageEdge : interStageEdges) { |
| final Stage srcStage = stageIdToStageMap.get(vertexToStageIdMap.get(interStageEdge.getSrc())); |
| final Stage dstStage = stageIdToStageMap.get(vertexToStageIdMap.get(interStageEdge.getDst())); |
| if (srcStage == null || dstStage == null) { |
| throw new IllegalVertexOperationException(String.format("Stage not added to the builder:%s%s", |
| srcStage == null ? String.format(" source stage for %s", interStageEdge.getSrc()) : "", |
| dstStage == null ? String.format(" destination stage for %s", interStageEdge.getDst()) : "")); |
| } |
| dagOfStagesBuilder.connectVertices(new StageEdge(interStageEdge.getId(), interStageEdge.getExecutionProperties(), |
| getActualVertexToPutIntoStage(interStageEdge.getSrc()), getActualVertexToPutIntoStage(interStageEdge.getDst()), |
| srcStage, dstStage)); |
| } |
| |
| return dagOfStagesBuilder.build(); |
| } |
| |
| /** |
| * This method is needed, because we do not want to put Sampling vertices into a stage. |
| * The underlying runtime only understands Source and Operator vertices. |
| */ |
| private IRVertex getActualVertexToPutIntoStage(final IRVertex irVertex) { |
| return irVertex instanceof SamplingVertex |
| ? ((SamplingVertex) irVertex).getCloneOfOriginalVertex() |
| : irVertex; |
| } |
| |
| /** |
| * Randomly select task indices for Sampling vertices. |
| * Select all task indices for non-Sampling vertices. |
| */ |
| private List<Integer> getTaskIndicesToExecute(final Set<IRVertex> vertices, |
| final int stageParallelism, |
| final Random random) { |
| if (vertices.stream().map(v -> v instanceof SamplingVertex).collect(Collectors.toSet()).size() != 1) { |
| throw new IllegalArgumentException("Must be either all sampling vertices, or none: " + vertices.toString()); |
| } |
| |
| if (vertices.iterator().next() instanceof SamplingVertex) { |
| // Use min of the desired sample rates |
| final float minSampleRate = vertices.stream() |
| .map(v -> ((SamplingVertex) v).getDesiredSampleRate()) |
| .reduce(BinaryOperator.minBy(Float::compareTo)) |
| .orElseThrow(() -> new IllegalArgumentException(vertices.toString())); |
| |
| // Compute and return indices |
| final int numOfTaskIndices = (int) Math.ceil(stageParallelism * minSampleRate); |
| final List<Integer> randomIndices = IntStream.range(0, stageParallelism).boxed().collect(Collectors.toList()); |
| Collections.shuffle(randomIndices, random); |
| return new ArrayList<>(randomIndices.subList(0, numOfTaskIndices)); // subList is not serializable. |
| } else { |
| return IntStream.range(0, stageParallelism).boxed().collect(Collectors.toList()); |
| } |
| } |
| |
| /** |
| * Integrity check for Stage. |
| * |
| * @param stage to check for |
| */ |
| private void integrityCheck(final Stage stage) { |
| stage.getPropertyValue(ParallelismProperty.class) |
| .orElseThrow(() -> new RuntimeException("Parallelism property must be set for Stage")); |
| stage.getPropertyValue(ScheduleGroupProperty.class) |
| .orElseThrow(() -> new RuntimeException("ScheduleGroup property must be set for Stage")); |
| |
| stage.getIRDAG().getVertices().forEach(irVertex -> { |
| // Check vertex type. |
| if (!(irVertex instanceof SourceVertex |
| || irVertex instanceof OperatorVertex)) { |
| throw new UnsupportedOperationException(irVertex.toString()); |
| } |
| }); |
| } |
| } |