/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;

import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.BloomPackager;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;

/**
 * This visitor visits the MRPlan and does the following
 * for each MROper: If the map plan or the reduce plan of the MROper has
 *  an end of all input flag present in it, this marks in the MROper whether the map
 * has an end of all input flag set or if the reduce has an end of all input flag set.
 *
 */
public class EndOfAllInputSetter extends MROpPlanVisitor {

    /**
     * @param plan MR plan to visit
     */
    public EndOfAllInputSetter(MROperPlan plan) {
        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
    }

    @Override
    public void visitMROp(MapReduceOper mr) throws VisitorException {

        EndOfAllInputChecker checker = new EndOfAllInputChecker(mr.mapPlan);
        checker.visit();
        if(checker.isEndOfAllInputPresent()) {
            mr.setEndOfAllInputInMap(true);
        }

        checker = new EndOfAllInputChecker(mr.reducePlan);
        checker.visit();
        if(checker.isEndOfAllInputPresent()) {
            mr.setEndOfAllInputInReduce(true);
        }

    }

    public static class EndOfAllInputChecker extends PhyPlanVisitor {

        private boolean endOfAllInputFlag = false;
        public EndOfAllInputChecker(PhysicalPlan plan) {
            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
        }

        @Override
        public void visitStream(POStream stream) throws VisitorException {
            // stream present
            endOfAllInputFlag = true;
        }

        @Override
        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
            // merge join present
            endOfAllInputFlag = true;
        }

        @Override
        public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException {
            // map side group present
            endOfAllInputFlag = true;
        }

        @Override
        public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
                throws VisitorException {
            endOfAllInputFlag = true;
        }

        @Override
        public void visitPartialAgg(POPartialAgg partAgg) throws VisitorException {
            endOfAllInputFlag = true;
        }

        @Override
        public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
            endOfAllInputFlag = true;
        }

        @Override
        public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
            endOfAllInputFlag = true;
        }

        @Override
        public void visitPOForEach(POForEach foreach) throws VisitorException {
            try {
                if (foreach.needEndOfAllInputProcessing()) {
                    endOfAllInputFlag = true;
                }
            } catch (Exception e) {
                throw new VisitorException(e);
            }
        }

        @Override
        public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
            if (lr instanceof POBuildBloomRearrangeTez) {
                endOfAllInputFlag = true;
            }
            super.visitLocalRearrange(lr);
        }

        @Override
        public void visitPackage(POPackage pkg) throws VisitorException {
            if (pkg.getPkgr() instanceof BloomPackager) {
                endOfAllInputFlag = true;
            }
        }
        /**
         * @return if end of all input is present
         */
        public boolean isEndOfAllInputPresent() {
            return endOfAllInputFlag;
        }
    }
}
