blob: c082044246fdfa894f8149966c617fa9610904f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POBuildBloomRearrangeTez;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
/**
* This visitor visits the MRPlan and does the following
* for each MROper: If the map plan or the reduce plan of the MROper has
* an end of all input flag present in it, this marks in the MROper whether the map
* has an end of all input flag set or if the reduce has an end of all input flag set.
*
*/
public class EndOfAllInputSetter extends MROpPlanVisitor {
/**
* @param plan MR plan to visit
*/
public EndOfAllInputSetter(MROperPlan plan) {
super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
}
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
EndOfAllInputChecker checker = new EndOfAllInputChecker(mr.mapPlan);
checker.visit();
if(checker.isEndOfAllInputPresent()) {
mr.setEndOfAllInputInMap(true);
}
checker = new EndOfAllInputChecker(mr.reducePlan);
checker.visit();
if(checker.isEndOfAllInputPresent()) {
mr.setEndOfAllInputInReduce(true);
}
}
public static class EndOfAllInputChecker extends PhyPlanVisitor {
private boolean endOfAllInputFlag = false;
public EndOfAllInputChecker(PhysicalPlan plan) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
}
@Override
public void visitStream(POStream stream) throws VisitorException {
// stream present
endOfAllInputFlag = true;
}
@Override
public void visitMergeJoin(POMergeJoin join) throws VisitorException {
// merge join present
endOfAllInputFlag = true;
}
@Override
public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException {
// map side group present
endOfAllInputFlag = true;
}
@Override
public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
throws VisitorException {
endOfAllInputFlag = true;
}
@Override
public void visitPartialAgg(POPartialAgg partAgg) throws VisitorException {
endOfAllInputFlag = true;
}
@Override
public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
endOfAllInputFlag = true;
}
@Override
public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorException {
endOfAllInputFlag = true;
}
@Override
public void visitPOForEach(POForEach foreach) throws VisitorException {
try {
if (foreach.needEndOfAllInputProcessing()) {
endOfAllInputFlag = true;
}
} catch (Exception e) {
throw new VisitorException(e);
}
}
@Override
public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
if (lr instanceof POBuildBloomRearrangeTez) {
endOfAllInputFlag = true;
}
super.visitLocalRearrange(lr);
}
/**
* @return if end of all input is present
*/
public boolean isEndOfAllInputPresent() {
return endOfAllInputFlag;
}
}
}