blob: a139f5fe022dcb918e1841e43c2cb949e1cf77ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
import java.util.*;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
/**
* An optimizer that removes unnecessary temp stores (Stores generated by
* the MRCompiler to bridge different jobs - even though a real store
* is produced from the same data). The pattern looks like this:
*
* ------------- Split ---------
* | |
* Store(InterStorage) Store(StoreFunc)
*
* Followed by a load of the tmp store in a dependent MapReduceOper.
*
* This optmizer removes the store, collapses the split if only one
* branch remains and adjusts the loads to load from the real store.
*
* The situation is produced by something we do to the logical plan in
* PigServer. There we change:
*
* PreOp
* |
* Store
*
* Load
* |
* PostOp
*
* To:
*
* PreOp
* |
* / \
* / \
* PostOp Store
*
* If there is a job boundary between pre and post we will end up in
* this case.
*
*/
class NoopStoreRemover extends MROpPlanVisitor {
private Log log = LogFactory.getLog(getClass());
private Map<String, FileSpec> replacementMap;
private List<RemovableStore> removalQ;
private List<POStore> storeQ;
NoopStoreRemover(MROperPlan plan) {
super(plan, new DependencyOrderWalker<MapReduceOper, MROperPlan>(plan));
replacementMap = new HashMap<String, FileSpec>();
}
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
removalQ = new LinkedList<RemovableStore>();
storeQ = new LinkedList<POStore>();
// This situation can happen in map and reduce (not combine)
new PhysicalRemover(mr.mapPlan).visit();
new PhysicalRemover(mr.reducePlan).visit();
for (RemovableStore st: removalQ) {
removeStore(st);
}
for (POStore st: storeQ) {
// don't need the input filespec anymore; and we don't
// want to serialize it in the job control compiler.
st.setInputSpec(null);
}
}
private void removeStore(RemovableStore rem) {
try {
// Remove the store plan from the nested split plan.
rem.split.removePlan(rem.storePlan);
// Collapse split if only one nested plan remains.
if (rem.split.getPlans().size() == 1) {
PhysicalPlan plan = rem.split.getPlans().get(0);
POStore store = (POStore)plan.getRoots().get(0);
plan.remove(store);
store.setInputs(rem.split.getInputs());
rem.plan.replace(rem.split, store);
}
} catch(PlanException pe) {
log.info("failed to remove unnecessary store from plan: "+pe.getMessage());
}
}
private static class RemovableStore {
public PhysicalPlan storePlan;
public PhysicalPlan plan;
public POSplit split;
}
private class PhysicalRemover extends PhyPlanVisitor {
PhysicalRemover(PhysicalPlan plan) {
super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
}
@Override
public void visit() throws VisitorException {
super.visit();
}
@Override
public void visitLoad(POLoad load) {
// As we go through update the load ops of the tmp stores
// that we removed with the resulting other stores output.
FileSpec spec = replacementMap.get(load.getLFile().getFileName());
if (spec != null) {
load.setLFile(spec);
}
}
@Override
public void visitStore(POStore store) {
// remember these. We will remove the input spec once
// we're done.
storeQ.add(store);
}
@Override
public void visitSplit(POSplit split) throws VisitorException {
super.visitSplit(split);
FileSpec lFile = null;
FileSpec sFile = null;
PhysicalPlan tmpStore = null;
for (PhysicalPlan plan: split.getPlans()) {
if (plan.size() == 1) {
PhysicalOperator op = plan.getRoots().get(0);
if (op instanceof POStore) {
POStore store = (POStore)op;
if (store.isTmpStore()) {
// tmp store means introduced by the
// MRCompiler. User didn't ask for
// those. There can be at most one per
// split. (Though there can be nested
// splits.)
tmpStore = plan;
sFile = store.getSFile();
} else if (store.getInputSpec() != null) {
// We set the input spec for store
// operators that had a corresponding load
// but we eliminated it in the
// PigServer. There could be multiple of
// those, but they are all reversible, so
// any of them will do.
lFile = store.getInputSpec();
}
}
}
}
if (tmpStore != null && lFile != null) {
// schedule removal (happens tuesdays and
// thursdays. don't park your car on the street on
// those days..
RemovableStore rem = new RemovableStore();
rem.storePlan = tmpStore;
rem.plan = mCurrentWalker.getPlan();
rem.split = split;
removalQ.add(rem);
replacementMap.put(sFile.getFileName(), lFile);
}
}
}
}