| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.pig.pen; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GTOrEqualToExpr; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LTOrEqualToExpr; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.NotEqualToExpr; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POIsNull; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONot; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORegexp; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion; |
| import org.apache.pig.data.DataBag; |
| import org.apache.pig.data.Tuple; |
| import org.apache.pig.impl.PigContext; |
| import org.apache.pig.impl.plan.DepthFirstWalker; |
| import org.apache.pig.impl.plan.PlanWalker; |
| import org.apache.pig.impl.plan.VisitorException; |
| import org.apache.pig.impl.util.IdentityHashSet; |
| import org.apache.pig.newplan.logical.relational.LogicalSchema; |
| import org.apache.pig.pen.util.LineageTracer; |
| |
| /** |
| * The class used to (re)attach illustrators to physical operators |
| * |
| * |
| */ |
| public class IllustratorAttacher extends PhyPlanVisitor { |
| |
| PigContext pigContext; |
| |
| LineageTracer lineage; |
| |
| HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap; |
| |
| private HashMap<PhysicalOperator, DataBag> poToDataMap; |
| private int maxRecords; |
| private boolean revisit = false; |
| private ArrayList<Boolean[]> subExpResults = null; |
| private final Map<POLoad, LogicalSchema> poloadToSchemaMap; |
| |
| public IllustratorAttacher(PhysicalPlan plan, LineageTracer lineage, int maxRecords, |
| Map<POLoad, LogicalSchema> poLoadToSchemaMap, PigContext hadoopPigContext) throws VisitorException { |
| super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); |
| pigContext = hadoopPigContext; |
| this.lineage = lineage; |
| poToEqclassesMap = new HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>>(); |
| poToDataMap = new HashMap<PhysicalOperator, DataBag>(); |
| this.maxRecords = maxRecords; |
| this.poloadToSchemaMap = poLoadToSchemaMap; |
| } |
| |
| /** |
| * revisit an enhanced physical plan from MR compilation |
| * @param plan a physical plan to be traversed |
| */ |
| public void revisit(PhysicalPlan plan) throws VisitorException { |
| pushWalker(new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); |
| revisit = true; |
| PhysicalPlan oriPlan = mPlan; |
| mPlan = plan; |
| visit(); |
| mPlan = oriPlan; |
| popWalker(); |
| } |
| |
| private void setIllustrator(PhysicalOperator po, int nEqClasses) { |
| if (revisit && po.getIllustrator() != null) |
| return; |
| LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>(); |
| poToEqclassesMap.put(po, eqClasses); |
| for (int i = 0; i < nEqClasses; ++i) |
| { |
| IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>(); |
| eqClasses.add(eqClass); |
| } |
| Illustrator illustrator = new Illustrator(lineage, eqClasses, this, pigContext); |
| po.setIllustrator(illustrator); |
| poToDataMap.put(po, illustrator.getData()); |
| } |
| |
| private void setIllustrator(PhysicalOperator po, LinkedList<IdentityHashSet<Tuple>> eqClasses) { |
| if (revisit && po.getIllustrator() != null) |
| return; |
| Illustrator illustrator = new Illustrator(lineage, eqClasses, this, pigContext); |
| po.setIllustrator(illustrator); |
| if (eqClasses != null) |
| poToEqclassesMap.put(po, eqClasses); |
| poToDataMap.put(po, illustrator.getData()); |
| } |
| |
| void setIllustrator(PhysicalOperator po) { |
| if (revisit && po.getIllustrator() != null) |
| return; |
| LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>(); |
| IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>(); |
| eqClasses.add(eqClass); |
| Illustrator illustrator = new Illustrator(lineage, eqClasses, this, pigContext); |
| po.setIllustrator(illustrator); |
| poToEqclassesMap.put(po, eqClasses); |
| poToDataMap.put(po, illustrator.getData()); |
| } |
| |
| public Map<PhysicalOperator, DataBag> getDataMap() { |
| return poToDataMap; |
| } |
| |
| @Override |
| public void visitLoad(POLoad ld) throws VisitorException{ |
| // LOAD from temporary files need no illustrator |
| if (revisit) |
| return; |
| |
| LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>(); |
| poToEqclassesMap.put(ld, eqClasses); |
| |
| IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>(); |
| eqClasses.add(eqClass); |
| Illustrator illustrator; |
| illustrator = new Illustrator(lineage, eqClasses, maxRecords, this, poloadToSchemaMap.get(ld), pigContext); |
| ld.setIllustrator(illustrator); |
| poToDataMap.put(ld, illustrator.getData()); |
| } |
| |
| @Override |
| public void visitStore(POStore st) throws VisitorException{ |
| setIllustrator(st, 1); |
| } |
| |
| @Override |
| public void visitFilter(POFilter fl) throws VisitorException{ |
| setIllustrator(fl, 0); |
| subExpResults = fl.getIllustrator().getSubExpResults(); |
| innerPlanAttach(fl, fl.getPlan()); |
| subExpResults = null; |
| } |
| |
| @Override |
| public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{ |
| super.visitLocalRearrange(lr); |
| setIllustrator(lr); |
| } |
| |
| @Override |
| public void visitPackage(POPackage pkg) throws VisitorException{ |
| setIllustrator(pkg, pkg.numberOfEquivalenceClasses()); |
| } |
| |
| @Override |
| public void visitPOForEach(POForEach nfe) throws VisitorException { |
| if (revisit && nfe.getIllustrator() != null) |
| return; |
| List<PhysicalPlan> innerPlans = nfe.getInputPlans(); |
| for (PhysicalPlan innerPlan : innerPlans) |
| innerPlanAttach(nfe, innerPlan); |
| List<PhysicalOperator> preds = mPlan.getPredecessors(nfe); |
| if (preds != null && preds.size() == 1 |
| && preds.get(0) instanceof POPackage |
| && ((POPackage) preds.get(0)).getPkgr().isDistinct()) { |
| // equivalence class of POPackage for DISTINCT needs to be used |
| //instead of the succeeding POForEach's equivalence class |
| setIllustrator(nfe, preds.get(0).getIllustrator().getEquivalenceClasses()); |
| nfe.getIllustrator().setEqClassesShared(); |
| } else |
| setIllustrator(nfe, 1); |
| } |
| |
| @Override |
| public void visitUnion(POUnion un) throws VisitorException{ |
| if (revisit && un.getIllustrator() != null) |
| return; |
| setIllustrator(un, null); |
| } |
| |
| @Override |
| public void visitSplit(POSplit spl) throws VisitorException{ |
| if (revisit && spl.getIllustrator() != null) |
| return; |
| for (PhysicalPlan poPlan : spl.getPlans()) |
| innerPlanAttach(spl, poPlan); |
| setIllustrator(spl); |
| } |
| |
| @Override |
| public void visitDemux(PODemux demux) throws VisitorException{ |
| if (revisit && demux.getIllustrator() != null) |
| return; |
| List<PhysicalPlan> innerPlans = demux.getPlans(); |
| for (PhysicalPlan innerPlan : innerPlans) |
| innerPlanAttach(demux, innerPlan); |
| setIllustrator(demux); |
| } |
| |
| @Override |
| public void visitDistinct(PODistinct distinct) throws VisitorException { |
| setIllustrator(distinct, 1); |
| } |
| |
| @Override |
| public void visitSort(POSort sort) throws VisitorException { |
| setIllustrator(sort, 1); |
| } |
| |
| @Override |
| public void visitRank(PORank rank) throws VisitorException { |
| setIllustrator(rank, 3); |
| } |
| |
| @Override |
| public void visitCounter(POCounter counter) throws VisitorException { |
| setIllustrator(counter, 1); |
| } |
| |
| @Override |
| public void visitProject(POProject proj) throws VisitorException{ |
| } |
| |
| @Override |
| public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{ |
| setIllustrator(grt, 0); |
| if (!revisit && subExpResults != null) |
| subExpResults.add(grt.getIllustrator().getSubExpResult()); |
| } |
| |
| @Override |
| public void visitLessThan(LessThanExpr lt) throws VisitorException{ |
| setIllustrator(lt, 0); |
| if (!revisit && subExpResults != null) |
| subExpResults.add(lt.getIllustrator().getSubExpResult()); |
| } |
| |
| @Override |
| public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{ |
| setIllustrator(gte, 0); |
| if (!revisit && subExpResults != null) |
| subExpResults.add(gte.getIllustrator().getSubExpResult()); |
| } |
| |
| @Override |
| public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{ |
| setIllustrator(lte, 0); |
| if (!revisit && subExpResults != null) |
| subExpResults.add(lte.getIllustrator().getSubExpResult()); |
| } |
| |
| @Override |
| public void visitEqualTo(EqualToExpr eq) throws VisitorException{ |
| setIllustrator(eq, 0); |
| if (!revisit && subExpResults != null) |
| subExpResults.add(eq.getIllustrator().getSubExpResult()); |
| } |
| |
| @Override |
| public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{ |
| setIllustrator(eq, 0); |
| if (!revisit && subExpResults != null) |
| subExpResults.add(eq.getIllustrator().getSubExpResult()); |
| } |
| |
| @Override |
| public void visitRegexp(PORegexp re) throws VisitorException{ |
| setIllustrator(re, 0); |
| if (!revisit && subExpResults != null) |
| subExpResults.add(re.getIllustrator().getSubExpResult()); |
| } |
| |
| @Override |
| public void visitIsNull(POIsNull isNull) throws VisitorException { |
| setIllustrator(isNull, 0); |
| if (!revisit && subExpResults != null) |
| subExpResults.add(isNull.getIllustrator().getSubExpResult()); |
| } |
| |
| @Override |
| public void visitAnd(POAnd and) throws VisitorException { |
| setIllustrator(and, 0); |
| } |
| |
| @Override |
| public void visitOr(POOr or) throws VisitorException { |
| setIllustrator(or, 0); |
| } |
| |
| @Override |
| public void visitNot(PONot not) throws VisitorException { |
| setIllustrator(not, 0); |
| if (!revisit && subExpResults != null) |
| subExpResults.add(not.getIllustrator().getSubExpResult()); |
| } |
| |
| @Override |
| public void visitBinCond(POBinCond binCond) { |
| |
| } |
| |
| @Override |
| public void visitNegative(PONegative negative) { |
| setIllustrator(negative, 1); |
| } |
| |
| @Override |
| public void visitUserFunc(POUserFunc userFunc) throws VisitorException { |
| } |
| |
| @Override |
| public void visitComparisonFunc(POUserComparisonFunc compFunc) throws VisitorException { |
| // one each for >, ==, and < |
| setIllustrator(compFunc, 3); |
| } |
| |
| @Override |
| public void visitMapLookUp(POMapLookUp mapLookUp) { |
| setIllustrator(mapLookUp, 1); |
| } |
| |
| @Override |
| public void visitCast(POCast cast) { |
| } |
| |
| @Override |
| public void visitLimit(POLimit lim) throws VisitorException { |
| setIllustrator(lim, 1); |
| } |
| |
| @Override |
| public void visitStream(POStream stream) throws VisitorException { |
| setIllustrator(stream, 1); |
| } |
| |
| /** |
| * @param optimizedForEach |
| */ |
| @Override |
| public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException { |
| visitPOForEach(optimizedForEach); |
| } |
| |
| private void innerPlanAttach(PhysicalOperator po, PhysicalPlan plan) throws VisitorException { |
| PlanWalker<PhysicalOperator, PhysicalPlan> childWalker = |
| mCurrentWalker.spawnChildWalker(plan); |
| pushWalker(childWalker); |
| childWalker.walk(this); |
| popWalker(); |
| LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>(); |
| if (subExpResults != null && !revisit) { |
| int size = 1 << subExpResults.size(); |
| for (int i = 0; i < size; ++i) { |
| eqClasses.add(new IdentityHashSet<Tuple>()); |
| } |
| po.getIllustrator().setEquivalenceClasses(eqClasses, po); |
| } |
| } |
| } |