blob: 1ec186c6d3546742f3fdced7ad6df655f04fd620 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
/**
* MultiQueryOptimizer for spark
*/
public class MultiQueryOptimizerSpark extends SparkOpPlanVisitor {
private static final Log LOG = LogFactory.getLog(MultiQueryOptimizerSpark.class);
private String scope;
private NodeIdGenerator nig;
public MultiQueryOptimizerSpark(SparkOperPlan plan) {
super(plan, new ReverseDependencyOrderWalker<SparkOperator, SparkOperPlan>(plan));
nig = NodeIdGenerator.getGenerator();
List<SparkOperator> roots = plan.getRoots();
scope = roots.get(0).getOperatorKey().getScope();
}
@Override
public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
try {
if (!sparkOp.isSplitter()) {
return;
}
List<SparkOperator> splittees = getPlan().getSuccessors(sparkOp);
if (splittees == null) {
return;
}
//If the size of predecessors of splittee is more than 1, then not do multiquery optimization
//@see TestMultiQueryBasic#testMultiQueryWithFJ_2
for (SparkOperator splittee : splittees) {
if (getPlan().getPredecessors(splittee).size() > 1) {
return;
}
}
if (splittees.size() == 1) {
// We don't need a POSplit here, we can merge the splittee into spliter
SparkOperator spliter = sparkOp;
SparkOperator singleSplitee = splittees.get(0);
List<PhysicalOperator> roots = singleSplitee.physicalPlan.getRoots();
List<PhysicalOperator> rootCopys = new ArrayList<PhysicalOperator>(roots);
//sort the roots by OperatorKey
//for the first element of roots, merge the physical plan of spliter and splittee
//for the other elements of roots,merge the clone physical plan of spliter and splittee
//the clone physical plan will have same type of physical operators but have more bigger OperatorKey
//thus physical operator with bigger OperatorKey will be executed later than those have small OperatorKey(see JobGraphBuilder.sortPredecessorRDDs())
Collections.sort(rootCopys);
List<PhysicalPlan> spliterPhysicalPlan = getPhysicalPlans(spliter.physicalPlan, rootCopys.size());
int i = 0;
for (PhysicalOperator root : rootCopys) {
if (root instanceof POLoad) {
POLoad load = (POLoad) root;
PhysicalPlan plClone = spliterPhysicalPlan.get(i);
POStore store = (POStore) plClone.getLeaves().get(0);
if (load.getLFile().getFileName().equals(store.getSFile().getFileName())) {
plClone.remove(store);
PhysicalOperator succOfload = singleSplitee.physicalPlan.getSuccessors(load).get(0);
singleSplitee.physicalPlan.remove(load);
mergePlanAWithPlanB(singleSplitee.physicalPlan, plClone, succOfload);
i++;
}
}
}
addSubPlanPropertiesToParent(singleSplitee, spliter);
removeSpliter(getPlan(), spliter, singleSplitee);
} else {
//If the size of splittee is more than 1, we need create a split which type is POSplit, merge all the physical plans
// of splittees to the physical plan of split and remove the splittees.
List<PhysicalOperator> firstNodeLeaves = sparkOp.physicalPlan.getLeaves();
PhysicalOperator firstNodeLeaf = firstNodeLeaves.size() > 0 ? firstNodeLeaves.get(0) : null;
POStore poStore = null;
if (firstNodeLeaf != null && firstNodeLeaf instanceof POStore) {
poStore = (POStore) firstNodeLeaf;
PhysicalOperator predOfPoStore = sparkOp.physicalPlan.getPredecessors(poStore).get(0);
sparkOp.physicalPlan.remove(poStore); // remove unnecessary store
POSplit poSplit = createSplit();
ArrayList<SparkOperator> spliteesCopy = new ArrayList
<SparkOperator>(splittees);
for (SparkOperator splitee : spliteesCopy) {
List<PhysicalOperator> rootsOfSplitee = new ArrayList(splitee.physicalPlan.getRoots());
for (int i = 0; i < rootsOfSplitee.size(); i++) {
if (rootsOfSplitee.get(i) instanceof POLoad) {
POLoad poLoad = (POLoad) rootsOfSplitee.get(i);
if (poLoad.getLFile().getFileName().equals(poStore.getSFile().getFileName())) {
List<PhysicalOperator> successorsOfPoLoad = splitee.physicalPlan.getSuccessors(poLoad);
List<PhysicalOperator> successorofPoLoadsCopy = new ArrayList<PhysicalOperator>(successorsOfPoLoad);
splitee.physicalPlan.remove(poLoad); // remove unnecessary load
for (PhysicalOperator successorOfPoLoad : successorofPoLoadsCopy) {
//we store from to relationship in SparkOperator#multiQueryOptimizeConnectionMap
sparkOp.addMultiQueryOptimizeConnectionItem(successorOfPoLoad.getOperatorKey(), predOfPoStore.getOperatorKey());
LOG.debug(String.format("add multiQueryOptimize connection item: to:%s, from:%s for %s",
successorOfPoLoad.toString(), predOfPoStore.getOperatorKey().toString(), splitee.getOperatorKey()));
}
}
}
}
poSplit.addPlan(splitee.physicalPlan);
addSubPlanPropertiesToParent(sparkOp, splitee);
removeSplittee(getPlan(), sparkOp, splitee);
}
sparkOp.physicalPlan.addAsLeaf(poSplit);
}
}
} catch (PlanException e) {
throw new VisitorException(e);
}
}
private List<PhysicalPlan> getPhysicalPlans(PhysicalPlan physicalPlan, int size) throws OptimizerException {
List<PhysicalPlan> ppList = new ArrayList<PhysicalPlan>();
try {
ppList.add(physicalPlan);
for (int i = 1; i < size; i++) {
ppList.add(physicalPlan.clone());
}
} catch (CloneNotSupportedException e) {
int errCode = 2127;
String msg = "Internal Error: Cloning of plan failed for optimization.";
throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
return ppList;
}
//Merge every operators in planB to operator "to" of planA
private void mergePlanAWithPlanB(PhysicalPlan planA, PhysicalPlan planB, PhysicalOperator to) throws PlanException {
PhysicalOperator predOfStore = planB.getLeaves().get(0);
planA.merge(planB);
planA.connect(predOfStore, to);
}
private void removeSpliter(SparkOperPlan plan, SparkOperator spliter, SparkOperator splittee) throws PlanException {
if (plan.getPredecessors(spliter) != null) {
List<SparkOperator> preds = new ArrayList(plan.getPredecessors(spliter));
plan.disconnect(spliter, splittee);
for (SparkOperator pred : preds) {
plan.disconnect(pred, spliter);
plan.connect(pred, splittee);
}
}
plan.remove(spliter);
}
private void removeSplittee(SparkOperPlan plan, SparkOperator splitter,
SparkOperator splittee) throws PlanException {
if (plan.getSuccessors(splittee) != null) {
List<SparkOperator> succs = new ArrayList();
succs.addAll(plan.getSuccessors(splittee));
plan.disconnect(splitter, splittee);
for (SparkOperator succSparkOperator : succs) {
plan.disconnect(splittee, succSparkOperator);
plan.connect(splitter, succSparkOperator);
}
}
getPlan().remove(splittee);
}
private POSplit createSplit() {
return new POSplit(new OperatorKey(scope, nig.getNextNodeId(scope)));
}
static public void addSubPlanPropertiesToParent(SparkOperator parentOper, SparkOperator subPlanOper) {
// Copy only map side properties. For eg: crossKeys.
// Do not copy reduce side specific properties. For eg: useSecondaryKey, segmentBelow, sortOrder, etc
if (subPlanOper.getCrossKeys() != null) {
for (String key : subPlanOper.getCrossKeys()) {
parentOper.addCrossKey(key);
}
}
parentOper.copyFeatures(subPlanOper, null);
if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) {
parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
}
subPlanOper.setRequestedParallelismByReference(parentOper);
parentOper.UDFs.addAll(subPlanOper.UDFs);
parentOper.scalars.addAll(subPlanOper.scalars);
}
}