blob: f23f3c4d1841653511c4d431a4dd7ed9c440a301 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.fetch;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.builtin.SampleLoader;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Utils;
/**
* FetchOptimizer determines whether the entire physical plan is fetchable, meaning
* that the task's result can be directly read (fetched) from the underlying storage
* rather than creating MR jobs. During the check {@link FetchablePlanVisitor} is used
* to walk through the plan.
*
*/
public class FetchOptimizer {
private static final Log LOG = LogFactory.getLog(FetchOptimizer.class);
/**
* Checks whether the fetch is enabled
*
* @param pc
* @return true if fetching is enabled
*/
public static boolean isFetchEnabled(PigContext pc) {
return "true".equalsIgnoreCase(
pc.getProperties().getProperty(PigConfiguration.PIG_OPT_FETCH, "true"));
}
/**
* Visits the plan with {@link FetchablePlanVisitor} and checks whether the
* plan is fetchable.
*
* @param pc PigContext
* @param pp the physical plan to be examined
* @return true if the plan is fetchable
* @throws VisitorException
*/
public static boolean isPlanFetchable(PigContext pc, PhysicalPlan pp) throws VisitorException {
if (isEligible(pc, pp)) {
FetchablePlanVisitor fpv = new FetchablePlanVisitor(pc, pp);
fpv.visit();
// Plan is fetchable only if FetchablePlanVisitor returns true AND
// limit is present in the plan, i.e: limit is pushed up to the loader.
// Limit is a safeguard. If the input is large, and there is no limit,
// fetch optimizer will fetch the entire input to the client. That can be dangerous.
if (!fpv.isPlanFetchable()) {
return false;
}
for (POLoad load : PlanHelper.getPhysicalOperators(pp, POLoad.class)) {
if (load.getLimit() == -1) {
return false;
}
}
pc.getProperties().setProperty(PigImplConstants.CONVERTED_TO_FETCH, "true");
init(pp);
return true;
}
return false;
}
private static void init(PhysicalPlan pp) throws VisitorException {
//mark POStream ops 'fetchable'
LinkedList<POStream> posList = PlanHelper.getPhysicalOperators(pp, POStream.class);
for (POStream pos : posList) {
pos.setFetchable(true);
}
}
/**
* Checks whether the plan fulfills the prerequisites needed for fetching.
*
* @param pc PigContext
* @param pp the physical plan to be examined
* @return
*/
private static boolean isEligible(PigContext pc, PhysicalPlan pp) {
if (!isFetchEnabled(pc)) {
return false;
}
List<PhysicalOperator> roots = pp.getRoots();
for (PhysicalOperator po : roots) {
if (!(po instanceof POLoad)) {
String msg = "Expected physical operator at root is POLoad. Found : "
+ po.getClass().getCanonicalName() + ". Fetch optimizer will be disabled.";
LOG.debug(msg);
return false;
}
}
//consider single leaf jobs only
int leafSize = pp.getLeaves().size();
if (pp.getLeaves().size() != 1) {
LOG.debug("Expected physical plan should have one leaf. Found " + leafSize);
return false;
}
return true;
}
/**
* A plan is considered 'fetchable' if:
* <pre>
* - it contains only: LIMIT, FILTER, FOREACH, STREAM, UNION(no implicit SPLIT is allowed)
* - no STORE
* - no scalar aliases ({@link org.apache.pig.impl.builtin.ReadScalars ReadScalars})
* - {@link org.apache.pig.LoadFunc LoadFunc} is not a {@link org.apache.pig.impl.builtin.SampleLoader SampleLoader}
* </pre>
*/
private static class FetchablePlanVisitor extends PhyPlanVisitor {
private boolean planFetchable = true;
private PigContext pc;
public FetchablePlanVisitor(PigContext pc, PhysicalPlan plan) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
this.pc = pc;
}
@Override
public void visit() throws VisitorException {
super.visit();
}
@Override
public void visitLoad(POLoad ld) throws VisitorException{
if (ld.getLoadFunc() instanceof SampleLoader) {
planFetchable = false;
}
}
@Override
public void visitStore(POStore st) throws VisitorException{
String basePathName = st.getSFile().getFileName();
//plan is fetchable if POStore belongs to EXPLAIN
if ("fakefile".equals(basePathName)) {
return;
}
//Otherwise check if target storage format equals to the intermediate storage format
//and its path points to a temporary storage path
boolean hasTmpStorageClass = st.getStoreFunc().getClass()
.equals(Utils.getTmpFileStorageClass(pc.getProperties()));
try {
boolean hasTmpTargetPath = isTempPath(basePathName);
if (!(hasTmpStorageClass && hasTmpTargetPath)) {
planFetchable = false;
}
}
catch (IOException e) {
String msg = "Internal error. Could not retrieve temporary store location.";
throw new VisitorException(msg, e);
}
}
@Override
public void visitNative(PONative nat) throws VisitorException {
planFetchable = false;
}
@Override
public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException {
planFetchable = false;
}
@Override
public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException {
planFetchable = false;
}
@Override
public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException {
planFetchable = false;
}
@Override
public void visitPackage(POPackage pkg) throws VisitorException {
planFetchable = false;
}
@Override
public void visitSplit(POSplit spl) throws VisitorException {
planFetchable = false;
}
@Override
public void visitDemux(PODemux demux) throws VisitorException {
planFetchable = false;
}
@Override
public void visitCounter(POCounter poCounter) throws VisitorException {
planFetchable = false;
}
@Override
public void visitRank(PORank rank) throws VisitorException {
planFetchable = false;
}
@Override
public void visitDistinct(PODistinct distinct) throws VisitorException {
planFetchable = false;
}
@Override
public void visitSort(POSort sort) throws VisitorException {
planFetchable = false;
}
@Override
public void visitCross(POCross cross) throws VisitorException {
planFetchable = false;
}
@Override
public void visitFRJoin(POFRJoin join) throws VisitorException {
planFetchable = false;
}
@Override
public void visitMergeJoin(POMergeJoin join) throws VisitorException {
planFetchable = false;
}
@Override
public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException {
planFetchable = false;
}
@Override
public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
planFetchable = false;
}
@Override
public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
planFetchable = false;
}
@Override
public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach)
throws VisitorException {
planFetchable = false;
}
@Override
public void visitPreCombinerLocalRearrange(
POPreCombinerLocalRearrange preCombinerLocalRearrange) {
planFetchable = false;
}
@Override
public void visitPartialAgg(POPartialAgg poPartialAgg) {
planFetchable = false;
}
private boolean isPlanFetchable() {
return planFetchable;
}
private boolean isTempPath(String basePathName) throws DataStorageException {
String tdir = pc.getProperties().getProperty("pig.temp.dir", "/tmp");
String tempStore = pc.getDfs().asContainer(tdir + "/temp").toString();
Matcher matcher = Pattern.compile(tempStore + "-?[0-9]+").matcher(basePathName);
return matcher.lookingAt();
}
}
}