/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;

import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;

public class ParallelismSetter extends TezOpPlanVisitor {

    private static final Log LOG = LogFactory.getLog(ParallelismSetter.class);
    private Configuration conf;
    private PigContext pc;
    private TezParallelismEstimator estimator;
    private boolean autoParallelismEnabled;
    private int estimatedTotalParallelism = 0;

    public ParallelismSetter(TezOperPlan plan, PigContext pigContext) {
        super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
        this.pc = pigContext;
        this.conf = ConfigurationUtil.toConfiguration(pc.getProperties());
        this.autoParallelismEnabled = conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true);
        try {
            this.estimator = conf.get(PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR) == null ? new TezOperDependencyParallelismEstimator()
            : PigContext.instantiateObjectFromParams(conf,
                    PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR, PigConfiguration.PIG_EXEC_REDUCER_ESTIMATOR_CONSTRUCTOR_ARG_KEY,
                    TezParallelismEstimator.class);
            this.estimator.setPigContext(pc);

        } catch (ExecException e) {
            throw new RuntimeException("Error instantiating TezParallelismEstimator", e);
        }
    }

    public int getEstimatedTotalParallelism() {
        return estimatedTotalParallelism;
    }

    @Override
    public void visitTezOp(TezOperator tezOp) throws VisitorException {
        if (tezOp instanceof NativeTezOper) {
            return;
        }
        try {
            // Can only set parallelism here if the parallelism isn't derived from
            // splits
            int parallelism = -1;
            if (tezOp.getLoaderInfo().getLoads() != null && tezOp.getLoaderInfo().getLoads().size() > 0) {
                // requestedParallelism of Loader vertex is handled in LoaderProcessor
                // propogate to vertexParallelism
                tezOp.setVertexParallelism(tezOp.getRequestedParallelism());
                incrementTotalParallelism(tezOp, tezOp.getRequestedParallelism());
                return;
            } else {
                int prevParallelism = -1;
                boolean isOneToOneParallelism = false;

                for (Map.Entry<OperatorKey,TezEdgeDescriptor> entry : tezOp.inEdges.entrySet()) {
                    if (entry.getValue().dataMovementType == DataMovementType.ONE_TO_ONE) {
                        TezOperator pred = mPlan.getOperator(entry.getKey());
                        parallelism = pred.getEffectiveParallelism(pc.defaultParallel);
                        if (prevParallelism == -1) {
                            prevParallelism = parallelism;
                        } else if (prevParallelism != parallelism) {
                            throw new VisitorException("one to one sources parallelism for vertex "
                                    + tezOp.getOperatorKey().toString() + " are not equal");
                        }
                        tezOp.setRequestedParallelism(pred.getRequestedParallelism());
                        // If tezOp.estimatedParallelism already set, don't override
                        // The only case is in PigGraceShuffleVertexManager, which
                        // set the estimated parallelism according to the output data size of the node
                        if (tezOp.getEstimatedParallelism()==-1) {
                            tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
                        }
                        isOneToOneParallelism = true;
                        incrementTotalParallelism(tezOp, parallelism);
                        parallelism = -1;
                    }
                }
                if (!isOneToOneParallelism) {
                    if (tezOp.getRequestedParallelism() != -1) {
                        parallelism = tezOp.getRequestedParallelism();
                    } else if (pc.defaultParallel != -1) {
                        parallelism = pc.defaultParallel;
                    }
                    if (parallelism == 0) {
                        // We need to produce empty output file.
                        // Even if user set PARALLEL 0, mapreduce has 1 reducer
                        parallelism = 1;
                    }
                    boolean overrideRequestedParallelism = false;
                    if (parallelism != -1
                            && autoParallelismEnabled
                            && !tezOp.isDontEstimateParallelism()
                            && tezOp.isIntermediateReducer()
                            && tezOp.isOverrideIntermediateParallelism()) {
                        overrideRequestedParallelism = true;
                    }
                    if (parallelism == -1 || overrideRequestedParallelism) {
                        if (tezOp.getEstimatedParallelism() == -1) {
                            // Override user specified parallelism with the estimated value
                            // if it is intermediate reducer
                            parallelism = estimator.estimateParallelism(mPlan, tezOp, conf);
                            if (overrideRequestedParallelism) {
                                if (tezOp.getRequestedParallelism() != parallelism) {
                                    LOG.info("Increased requested parallelism of " + tezOp.getOperatorKey() + " to " + parallelism);
                                }
                                tezOp.setRequestedParallelism(parallelism);
                            } else {
                                tezOp.setEstimatedParallelism(parallelism);
                            }
                        } else {
                            parallelism = tezOp.getEstimatedParallelism();
                        }
                        if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) {
                            boolean additionalEdge = false;
                            if (tezOp.isGlobalSort() && getPlan().getPredecessors(tezOp).size() != 1 ||
                                    tezOp.isSkewedJoin() && getPlan().getPredecessors(tezOp).size() != 2) {
                                additionalEdge = true;
                            }
                            if (!overrideRequestedParallelism && !additionalEdge) {
                                incrementTotalParallelism(tezOp, parallelism);
                                // PartitionerDefinedVertexManager will determine parallelism.
                                // So call setVertexParallelism with -1
                                // setEstimatedParallelism still needs to have some positive value
                                // so that TezDAGBuilder sets the PartitionerDefinedVertexManager
                                parallelism = -1;
                            } else {
                                // We are overriding the parallelism. We need to update the
                                // Constant value in sampleAggregator to same parallelism
                                // Currently will happen when you have orderby or
                                // skewed join followed by group by with combiner
                                for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
                                    if (pred.isSampleBasedPartitioner()) {
                                        for (TezOperator partitionerPred : mPlan.getPredecessors(pred)) {
                                            if (partitionerPred.isSampleAggregation() && partitionerPred.plan!=null) {
                                                LOG.debug("Updating parallelism constant value to " + parallelism + " in " + partitionerPred.plan);
                                                ParallelConstantVisitor visitor =
                                                        new ParallelConstantVisitor(partitionerPred.plan, parallelism);
                                                visitor.visit();
                                                partitionerPred.setNeedEstimatedQuantile(false);
                                                break;
                                            }
                                        }
                                        break;
                                    }
                                }
                            }
                        }
                    }
                }
            }

            incrementTotalParallelism(tezOp, parallelism);
            tezOp.setVertexParallelism(parallelism);

            // TODO: Fix case where vertex parallelism is -1 for auto parallelism with PartitionerDefinedVertexManager.
            // i.e order by or skewed join followed by cross
            if (tezOp.getCrossKeys() != null) {
                for (String key : tezOp.getCrossKeys()) {
                    pc.getProperties().put(PigImplConstants.PIG_CROSS_PARALLELISM + "." + key,
                            Integer.toString(tezOp.getVertexParallelism()));
                }
            }
        } catch (Exception e) {
            throw new VisitorException(e);
        }
    }

    private void incrementTotalParallelism(TezOperator tezOp, int tezOpParallelism) {
        if (tezOp.isVertexGroup()) {
            return;
        }
        if (tezOpParallelism != -1) {
            estimatedTotalParallelism += tezOpParallelism;
        }
    }

}
