| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; |
| |
| import java.util.*; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor; |
| import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; |
| import org.apache.pig.impl.io.FileSpec; |
| import org.apache.pig.impl.plan.PlanException; |
| import org.apache.pig.impl.plan.DependencyOrderWalker; |
| import org.apache.pig.impl.plan.VisitorException; |
| import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; |
| |
| /** |
| * An optimizer that removes unnecessary temp stores (Stores generated by |
| * the MRCompiler to bridge different jobs - even though a real store |
| * is produced from the same data). The pattern looks like this: |
| * |
| * ------------- Split --------- |
| * | | |
| * Store(InterStorage) Store(StoreFunc) |
| * |
| * Followed by a load of the tmp store in a dependent MapReduceOper. |
| * |
| * This optmizer removes the store, collapses the split if only one |
| * branch remains and adjusts the loads to load from the real store. |
| * |
| * The situation is produced by something we do to the logical plan in |
| * PigServer. There we change: |
| * |
| * PreOp |
| * | |
| * Store |
| * |
| * Load |
| * | |
| * PostOp |
| * |
| * To: |
| * |
| * PreOp |
| * | |
| * / \ |
| * / \ |
| * PostOp Store |
| * |
| * If there is a job boundary between pre and post we will end up in |
| * this case. |
| * |
| */ |
| class NoopStoreRemover extends MROpPlanVisitor { |
| |
| private Log log = LogFactory.getLog(getClass()); |
| |
| private Map<String, FileSpec> replacementMap; |
| private List<RemovableStore> removalQ; |
| private List<POStore> storeQ; |
| |
| NoopStoreRemover(MROperPlan plan) { |
| super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan)); |
| replacementMap = new HashMap<String, FileSpec>(); |
| } |
| |
| @Override |
| public void visitMROp(MapReduceOper mr) throws VisitorException { |
| removalQ = new LinkedList<RemovableStore>(); |
| storeQ = new LinkedList<POStore>(); |
| |
| // This situation can happen in map and reduce (not combine) |
| new PhysicalRemover(mr.mapPlan).visit(); |
| new PhysicalRemover(mr.reducePlan).visit(); |
| |
| for (RemovableStore st: removalQ) { |
| removeStore(st); |
| } |
| |
| for (POStore st: storeQ) { |
| // don't need the input filespec anymore; and we don't |
| // want to serialize it in the job control compiler. |
| st.setInputSpec(null); |
| } |
| |
| } |
| |
| private void removeStore(RemovableStore rem) { |
| try { |
| // Remove the store plan from the nested split plan. |
| rem.split.removePlan(rem.storePlan); |
| |
| // Collapse split if only one nested plan remains. |
| if (rem.split.getPlans().size() == 1) { |
| PhysicalPlan plan = rem.split.getPlans().get(0); |
| POStore store = (POStore)plan.getRoots().get(0); |
| plan.remove(store); |
| store.setInputs(rem.split.getInputs()); |
| rem.plan.replace(rem.split, store); |
| } |
| } catch(PlanException pe) { |
| log.info("failed to remove unnecessary store from plan: "+pe.getMessage()); |
| } |
| } |
| |
| private static class RemovableStore { |
| public PhysicalPlan storePlan; |
| public PhysicalPlan plan; |
| public POSplit split; |
| } |
| |
| private class PhysicalRemover extends PhyPlanVisitor { |
| PhysicalRemover(PhysicalPlan plan) { |
| super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan)); |
| } |
| |
| @Override |
| public void visit() throws VisitorException { |
| super.visit(); |
| } |
| |
| @Override |
| public void visitLoad(POLoad load) { |
| // As we go through update the load ops of the tmp stores |
| // that we removed with the resulting other stores output. |
| FileSpec spec = replacementMap.get(load.getLFile().getFileName()); |
| if (spec != null) { |
| load.setLFile(spec); |
| } |
| } |
| |
| @Override |
| public void visitStore(POStore store) { |
| // remember these. We will remove the input spec once |
| // we're done. |
| storeQ.add(store); |
| } |
| |
| @Override |
| public void visitSplit(POSplit split) throws VisitorException { |
| super.visitSplit(split); |
| FileSpec lFile = null; |
| FileSpec sFile = null; |
| PhysicalPlan tmpStore = null; |
| |
| for (PhysicalPlan plan: split.getPlans()) { |
| if (plan.size() == 1) { |
| PhysicalOperator op = plan.getRoots().get(0); |
| if (op instanceof POStore) { |
| POStore store = (POStore)op; |
| |
| if (store.isTmpStore()) { |
| // tmp store means introduced by the |
| // MRCompiler. User didn't ask for |
| // those. There can be at most one per |
| // split. (Though there can be nested |
| // splits.) |
| tmpStore = plan; |
| sFile = store.getSFile(); |
| } else if (store.getInputSpec() != null) { |
| // We set the input spec for store |
| // operators that had a corresponding load |
| // but we eliminated it in the |
| // PigServer. There could be multiple of |
| // those, but they are all reversible, so |
| // any of them will do. |
| lFile = store.getInputSpec(); |
| } |
| } |
| } |
| } |
| |
| if (tmpStore != null && lFile != null) { |
| // schedule removal (happens tuesdays and |
| // thursdays. don't park your car on the street on |
| // those days.. |
| RemovableStore rem = new RemovableStore(); |
| rem.storePlan = tmpStore; |
| rem.plan = mCurrentWalker.getPlan(); |
| rem.split = split; |
| removalQ.add(rem); |
| replacementMap.put(sFile.getFileName(), lFile); |
| } |
| } |
| } |
| } |