blob: 8c0e648b95ae1c498fe435f5869cfb91b97b21f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POReduceBySpark;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
import org.apache.pig.impl.util.Pair;
import com.google.common.collect.Maps;
/**
* This class goes through the physical plan are replaces GlobalRearrange with ReduceBy
* where there are algebraic operations.
*/
public class CombinerOptimizer extends SparkOpPlanVisitor {
private static Log LOG = LogFactory.getLog(CombinerOptimizer.class);
public CombinerOptimizer(SparkOperPlan plan) {
super(plan, new DepthFirstWalker<>(plan));
}
@Override
public void visitSparkOp(SparkOperator sparkOp) throws VisitorException {
try {
addCombiner(sparkOp.physicalPlan);
} catch (Exception e) {
throw new VisitorException(e);
}
}
// Checks for algebraic operations and if they exist.
// Replaces global rearrange (cogroup) with reduceBy as follows:
// Input:
// foreach (using algebraicOp)
// -> packager
// -> globalRearrange
// -> localRearrange
// Output:
// foreach (using algebraicOp.Final)
// -> reduceBy (uses algebraicOp.Intermediate)
// -> foreach (using algebraicOp.Initial)
// -> CombinerRearrange
private void addCombiner(PhysicalPlan phyPlan) throws VisitorException, PlanException, CloneNotSupportedException {
List<PhysicalOperator> leaves = phyPlan.getLeaves();
if (leaves == null || leaves.size() != 1) {
return;
}
// Ensure there is grouping.
List<POGlobalRearrange> glrs = PlanHelper.getPhysicalOperators(phyPlan, POGlobalRearrange.class);
if (glrs == null || glrs.size() == 0) {
return;
}
for (POGlobalRearrange glr : glrs) {
List<PhysicalOperator> glrSuccessors = phyPlan.getSuccessors(glr);
if (glrSuccessors == null || glrSuccessors.isEmpty()) {
continue;
}
if (!(glrSuccessors.get(0) instanceof POPackage)) {
continue;
}
POPackage poPackage = (POPackage) glrSuccessors.get(0);
List<PhysicalOperator> poPackageSuccessors = phyPlan.getSuccessors(poPackage);
if (poPackageSuccessors == null || poPackageSuccessors.size() != 1) {
continue;
}
PhysicalOperator successor = poPackageSuccessors.get(0);
// Retaining the original successor to be used later in modifying the plan.
PhysicalOperator packageSuccessor = successor;
if (successor instanceof POLimit) {
// POLimit is acceptable, as long as it has a single foreach as
// successor
List<PhysicalOperator> limitSucs = phyPlan.getSuccessors(successor);
if (limitSucs != null && limitSucs.size() == 1 &&
limitSucs.get(0) instanceof POForEach) {
// the code below will now further examine the foreach
successor = limitSucs.get(0);
}
}
if (successor instanceof POForEach) {
POForEach foreach = (POForEach) successor;
List<PhysicalOperator> foreachSuccessors = phyPlan.getSuccessors(foreach);
// multi-query
if (foreachSuccessors == null || foreachSuccessors.size() != 1) {
continue;
}
// Clone foreach so it can be modified to a post-reduce foreach.
POForEach postReduceFE = foreach.clone();
List<PhysicalPlan> feInners = postReduceFE.getInputPlans();
// find algebraic operators and also check if the foreach statement
// is suitable for combiner use
List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps = CombinerOptimizerUtil.findAlgebraicOps
(feInners);
if (algebraicOps == null || algebraicOps.size() == 0) {
// the plan is not combinable or there is nothing to combine
// we're done
continue;
}
try {
List<PhysicalOperator> glrPredecessors = phyPlan.getPredecessors(glr);
// Exclude co-group from optimization
if (glrPredecessors == null || glrPredecessors.size() != 1) {
continue;
}
if (!(glrPredecessors.get(0) instanceof POLocalRearrange)) {
continue;
}
POLocalRearrange rearrange = (POLocalRearrange) glrPredecessors.get(0);
LOG.info("Algebraic operations found. Optimizing plan to use combiner.");
// Trim the global rearrange and the preceeding package.
convertToMapSideForEach(phyPlan, poPackage);
// replace PODistinct->Project[*] with distinct udf (which is Algebraic)
for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) {
if (!(op2plan.first instanceof PODistinct)) {
continue;
}
CombinerOptimizerUtil.DistinctPatcher distinctPatcher
= new CombinerOptimizerUtil.DistinctPatcher(op2plan.second);
distinctPatcher.visit();
if (distinctPatcher.getDistinct() == null) {
int errCode = 2073;
String msg = "Problem with replacing distinct operator with distinct built-in function.";
throw new PlanException(msg, errCode, PigException.BUG);
}
op2plan.first = distinctPatcher.getDistinct();
}
// create new map foreach -
POForEach mfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
.getKeyType());
Map<PhysicalOperator, Integer> op2newpos = Maps.newHashMap();
Integer pos = 1;
// create plan for each algebraic udf and add as inner plan in map-foreach
for (Pair<PhysicalOperator, PhysicalPlan> op2plan : algebraicOps) {
PhysicalPlan udfPlan = CombinerOptimizerUtil.createPlanWithPredecessors(op2plan.first,
op2plan.second);
mfe.addInputPlan(udfPlan, false);
op2newpos.put(op2plan.first, pos++);
}
CombinerOptimizerUtil.changeFunc(mfe, POUserFunc.INITIAL);
// since we will only be creating SingleTupleBag as input to
// the map foreach, we should flag the POProjects in the map
// foreach inner plans to also use SingleTupleBag
for (PhysicalPlan mpl : mfe.getInputPlans()) {
try {
new CombinerOptimizerUtil.fixMapProjects(mpl).visit();
} catch (VisitorException e) {
int errCode = 2089;
String msg = "Unable to flag project operator to use single tuple bag.";
throw new PlanException(msg, errCode, PigException.BUG, e);
}
}
// create new combine foreach
POForEach cfe = CombinerOptimizerUtil.createForEachWithGrpProj(postReduceFE, poPackage.getPkgr()
.getKeyType());
// add algebraic functions with appropriate projection
CombinerOptimizerUtil.addAlgebraicFuncToCombineFE(cfe, op2newpos);
// we have modified the foreach inner plans - so set them again
// for the foreach so that foreach can do any re-initialization
// around them.
mfe.setInputPlans(mfe.getInputPlans());
cfe.setInputPlans(cfe.getInputPlans());
// tell POCombinerPackage which fields need projected and which
// placed in bags. First field is simple project rest need to go
// into bags
int numFields = algebraicOps.size() + 1; // algebraic funcs + group key
boolean[] bags = new boolean[numFields];
bags[0] = false;
for (int i = 1; i < numFields; i++) {
bags[i] = true;
}
// Use the POCombiner package in the combine plan
// as it needs to act differently than the regular
// package operator.
CombinerPackager pkgr = new CombinerPackager(poPackage.getPkgr(), bags);
POPackage combinePack = poPackage.clone();
combinePack.setPkgr(pkgr);
// A specialized local rearrange operator will replace
// the normal local rearrange in the map plan.
POLocalRearrange newRearrange = CombinerOptimizerUtil.getNewRearrange(rearrange);
POPreCombinerLocalRearrange combinerLocalRearrange = CombinerOptimizerUtil.getPreCombinerLR
(rearrange);
phyPlan.replace(rearrange, combinerLocalRearrange);
// Create a reduceBy operator.
POReduceBySpark reduceOperator = new POReduceBySpark(cfe.getOperatorKey(), combinerLocalRearrange
.getRequestedParallelism(), cfe.getInputPlans(), cfe.getToBeFlattened(), combinePack,
newRearrange);
reduceOperator.setCustomPartitioner(glr.getCustomPartitioner());
fixReduceSideFE(postReduceFE, algebraicOps);
CombinerOptimizerUtil.changeFunc(reduceOperator, POUserFunc.INTERMEDIATE);
updatePackager(reduceOperator, newRearrange);
// Add the new operators
phyPlan.add(reduceOperator);
phyPlan.add(mfe);
// Connect the new operators as follows:
// reduceBy (using algebraicOp.Intermediate)
// -> foreach (using algebraicOp.Initial)
phyPlan.connect(mfe, reduceOperator);
// Insert the reduce stage between combiner rearrange and its successor.
phyPlan.disconnect(combinerLocalRearrange, packageSuccessor);
phyPlan.connect(reduceOperator, packageSuccessor);
phyPlan.connect(combinerLocalRearrange, mfe);
// Replace foreach with post reduce foreach
phyPlan.add(postReduceFE);
phyPlan.replace(foreach, postReduceFE);
} catch (Exception e) {
int errCode = 2018;
String msg = "Internal error. Unable to introduce the combiner for optimization.";
throw new OptimizerException(msg, errCode, PigException.BUG, e);
}
}
}
}
// Modifies the input plans of the post reduce foreach to match the output of reduce stage.
private void fixReduceSideFE(POForEach postReduceFE, List<Pair<PhysicalOperator, PhysicalPlan>> algebraicOps)
throws ExecException, PlanException {
int i=1;
for (Pair<PhysicalOperator, PhysicalPlan> algebraicOp : algebraicOps) {
POUserFunc combineUdf = (POUserFunc) algebraicOp.first;
PhysicalPlan pplan = algebraicOp.second;
combineUdf.setAlgebraicFunction(POUserFunc.FINAL);
POProject newProj = new POProject(
CombinerOptimizerUtil.createOperatorKey(postReduceFE.getOperatorKey().getScope()),
1, i
);
newProj.setResultType(DataType.BAG);
PhysicalOperator udfInput = pplan.getPredecessors(combineUdf).get(0);
pplan.disconnect(udfInput, combineUdf);
pplan.add(newProj);
pplan.connect(newProj, combineUdf);
i++;
}
postReduceFE.setResultType(DataType.TUPLE);
}
// Modifies the map side of foreach (before reduce).
private void convertToMapSideForEach(PhysicalPlan physicalPlan, POPackage poPackage)
throws PlanException {
LinkedList<PhysicalOperator> operatorsToRemove = new LinkedList<>();
for (PhysicalOperator physicalOperator : physicalPlan.getPredecessors(poPackage)) {
if (physicalOperator instanceof POGlobalRearrangeSpark) {
operatorsToRemove.add(physicalOperator);
break;
}
}
// Remove global rearranges preceeding POPackage
for (PhysicalOperator po : operatorsToRemove) {
physicalPlan.removeAndReconnect(po);
}
// Remove POPackage itself.
physicalPlan.removeAndReconnect(poPackage);
}
// Update the ReduceBy Operator with the packaging used by Local rearrange.
private void updatePackager(POReduceBySpark reduceOperator, POLocalRearrange lrearrange) throws OptimizerException {
Packager pkgr = reduceOperator.getPKGOp().getPkgr();
// annotate the package with information from the LORearrange
// update the keyInfo information if already present in the POPackage
Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkgr.getKeyInfo();
if (keyInfo == null)
keyInfo = new HashMap<>();
if (keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
// something is wrong - we should not be getting key info
// for the same index from two different Local Rearranges
int errCode = 2087;
String msg = "Unexpected problem during optimization." +
" Found index:" + lrearrange.getIndex() +
" in multiple LocalRearrange operators.";
throw new OptimizerException(msg, errCode, PigException.BUG);
}
keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
new Pair<Boolean, Map<Integer, Integer>>(
lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
pkgr.setKeyInfo(keyInfo);
pkgr.setKeyTuple(lrearrange.isKeyTuple());
pkgr.setKeyCompound(lrearrange.isKeyCompound());
}
/**
* Look for a algebraic POUserFunc that is the leaf of an input plan.
*
* @param pplan physical plan
* @return null if any operator other POProject or non-algebraic POUserFunc is
* found while going down the plan, otherwise algebraic POUserFunc is returned
*/
private static POUserFunc getAlgebraicSuccessor(PhysicalPlan pplan) {
// check if it ends in an UDF
List<PhysicalOperator> leaves = pplan.getLeaves();
if (leaves == null || leaves.size() != 1) {
return null;
}
PhysicalOperator succ = leaves.get(0);
if (succ instanceof POUserFunc && ((POUserFunc) succ).combinable()) {
return (POUserFunc) succ;
}
// some other operator ? can't combine
return null;
}
}