blob: b9de7d0f6171e2e54dcb94a12f60d6515891e68c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
public class ParallelismSetter extends TezOpPlanVisitor {
private static final Log LOG = LogFactory.getLog(ParallelismSetter.class);
private Configuration conf;
private PigContext pc;
private TezParallelismEstimator estimator;
private boolean autoParallelismEnabled;
private int estimatedTotalParallelism = 0;
public ParallelismSetter(TezOperPlan plan, PigContext pigContext) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
this.pc = pigContext;
this.conf = ConfigurationUtil.toConfiguration(pc.getProperties());
this.autoParallelismEnabled = conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true);
try {
this.estimator = conf.get(PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR) == null ? new TezOperDependencyParallelismEstimator()
: PigContext.instantiateObjectFromParams(conf,
PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR, PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY,
TezParallelismEstimator.class);
this.estimator.setPigContext(pc);
} catch (ExecException e) {
throw new RuntimeException("Error instantiating TezParallelismEstimator", e);
}
}
public int getEstimatedTotalParallelism() {
return estimatedTotalParallelism;
}
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
if (tezOp instanceof NativeTezOper) {
return;
}
try {
// Can only set parallelism here if the parallelism isn't derived from
// splits
int parallelism = -1;
if (tezOp.getLoaderInfo().getLoads() != null && tezOp.getLoaderInfo().getLoads().size() > 0) {
// requestedParallelism of Loader vertex is handled in LoaderProcessor
// propogate to vertexParallelism
tezOp.setVertexParallelism(tezOp.getRequestedParallelism());
incrementTotalParallelism(tezOp, tezOp.getRequestedParallelism());
return;
} else {
int prevParallelism = -1;
boolean isOneToOneParallelism = false;
for (Map.Entry<OperatorKey,TezEdgeDescriptor> entry : tezOp.inEdges.entrySet()) {
if (entry.getValue().dataMovementType == DataMovementType.ONE_TO_ONE) {
TezOperator pred = mPlan.getOperator(entry.getKey());
parallelism = pred.getEffectiveParallelism(pc.defaultParallel);
if (prevParallelism == -1) {
prevParallelism = parallelism;
} else if (prevParallelism != parallelism) {
throw new VisitorException("one to one sources parallelism for vertex "
+ tezOp.getOperatorKey().toString() + " are not equal");
}
tezOp.setRequestedParallelism(pred.getRequestedParallelism());
// If tezOp.estimatedParallelism already set, don't override
// The only case is in PigGraceShuffleVertexManager, which
// set the estimated parallelism according to the output data size of the node
if (tezOp.getEstimatedParallelism()==-1) {
tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
}
isOneToOneParallelism = true;
incrementTotalParallelism(tezOp, parallelism);
parallelism = -1;
}
}
if (!isOneToOneParallelism) {
if (tezOp.getRequestedParallelism() != -1) {
parallelism = tezOp.getRequestedParallelism();
} else if (pc.defaultParallel != -1) {
parallelism = pc.defaultParallel;
}
if (parallelism == 0) {
// We need to produce empty output file.
// Even if user set PARALLEL 0, mapreduce has 1 reducer
parallelism = 1;
}
boolean overrideRequestedParallelism = false;
if (parallelism != -1
&& autoParallelismEnabled
&& !tezOp.isDontEstimateParallelism()
&& tezOp.isIntermediateReducer()
&& tezOp.isOverrideIntermediateParallelism()) {
overrideRequestedParallelism = true;
}
if (parallelism == -1 || overrideRequestedParallelism) {
if (tezOp.getEstimatedParallelism() == -1) {
// Override user specified parallelism with the estimated value
// if it is intermediate reducer
parallelism = estimator.estimateParallelism(mPlan, tezOp, conf);
if (overrideRequestedParallelism) {
if (tezOp.getRequestedParallelism() != parallelism) {
LOG.info("Increased requested parallelism of " + tezOp.getOperatorKey() + " to " + parallelism);
}
tezOp.setRequestedParallelism(parallelism);
} else {
tezOp.setEstimatedParallelism(parallelism);
}
} else {
parallelism = tezOp.getEstimatedParallelism();
}
if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) {
boolean additionalEdge = false;
if (tezOp.isGlobalSort() && getPlan().getPredecessors(tezOp).size() != 1 ||
tezOp.isSkewedJoin() && getPlan().getPredecessors(tezOp).size() != 2) {
additionalEdge = true;
}
if (!overrideRequestedParallelism && !additionalEdge) {
incrementTotalParallelism(tezOp, parallelism);
// PartitionerDefinedVertexManager will determine parallelism.
// So call setVertexParallelism with -1
// setEstimatedParallelism still needs to have some positive value
// so that TezDAGBuilder sets the PartitionerDefinedVertexManager
parallelism = -1;
} else {
// We are overriding the parallelism. We need to update the
// Constant value in sampleAggregator to same parallelism
// Currently will happen when you have orderby or
// skewed join followed by group by with combiner
for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
if (pred.isSampleBasedPartitioner()) {
for (TezOperator partitionerPred : mPlan.getPredecessors(pred)) {
if (partitionerPred.isSampleAggregation() && partitionerPred.plan!=null) {
LOG.debug("Updating parallelism constant value to " + parallelism + " in " + partitionerPred.plan);
ParallelConstantVisitor visitor =
new ParallelConstantVisitor(partitionerPred.plan, parallelism);
visitor.visit();
partitionerPred.setNeedEstimatedQuantile(false);
break;
}
}
break;
}
}
}
}
}
}
}
incrementTotalParallelism(tezOp, parallelism);
tezOp.setVertexParallelism(parallelism);
// TODO: Fix case where vertex parallelism is -1 for auto parallelism with PartitionerDefinedVertexManager.
// i.e order by or skewed join followed by cross
if (tezOp.getCrossKeys() != null) {
for (String key : tezOp.getCrossKeys()) {
pc.getProperties().put(PigImplConstants.PIG_CROSS_PARALLELISM + "." + key,
Integer.toString(tezOp.getVertexParallelism()));
}
}
} catch (Exception e) {
throw new VisitorException(e);
}
}
private void incrementTotalParallelism(TezOperator tezOp, int tezOpParallelism) {
if (tezOp.isVertexGroup()) {
return;
}
if (tezOpParallelism != -1) {
estimatedTotalParallelism += tezOpParallelism;
}
}
}