blob: 7f0c3982e7dc09714da2c140198f5adae5553737 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.rules;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.newplan.BaseOperatorPlan;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
import org.apache.pig.newplan.ReverseDependencyOrderWalkerWOSeenChk;
import org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.ExpToPhyTranslationVisitor;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.UserFuncExpression;
import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.newplan.optimizer.Transformer;
import org.joda.time.DateTimeZone;
public abstract class ConstantCalculator extends Rule {
private List<LogicalRelationalOperator> processedOperators = new ArrayList<LogicalRelationalOperator>();
private PigContext pc;
public ConstantCalculator(String n, PigContext pc) {
super(n, false);
this.pc = pc;
}
@Override
public Transformer getNewTransformer() {
return new ConstantCalculatorTransformer(processedOperators, pc);
}
public static class ConstantCalculatorTransformer extends Transformer {
private List<LogicalRelationalOperator> processedOperators = new ArrayList<LogicalRelationalOperator>();
private OperatorPlan plan;
private PigContext pc;
public ConstantCalculatorTransformer(List<LogicalRelationalOperator> processedOperators, PigContext pc) {
this.processedOperators = processedOperators;
this.pc = pc;
}
@Override
public boolean check(OperatorPlan matched) throws FrontendException {
Iterator<Operator> operators = matched.getOperators();
while (operators.hasNext()) {
LogicalRelationalOperator operator = (LogicalRelationalOperator)operators.next();
// If the operator is already processed, we quit.
if (processedOperators.contains(operator)) {
continue;
}
processedOperators.add(operator);
return true;
}
return false;
}
public static class ConstantCalculatorLogicalPlanVisitor extends AllExpressionVisitor {
private PigContext pc;
public ConstantCalculatorLogicalPlanVisitor(OperatorPlan plan, PigContext pc) throws FrontendException {
super(plan, new DependencyOrderWalker(plan));
this.pc = pc;
}
@Override
protected LogicalExpressionVisitor getVisitor(
LogicalExpressionPlan expr) throws FrontendException {
return new ConstantCalculatorExpressionVisitor(expr, currentOp, pc);
}
}
public static class ConstantCalculatorExpressionVisitor extends AllSameExpressionVisitor {
private LogicalRelationalOperator currentOp;
private PigContext pc;
private DateTimeZone currentDTZ = null;
public ConstantCalculatorExpressionVisitor(OperatorPlan expPlan,
LogicalRelationalOperator currentOp, PigContext pc) throws FrontendException {
super(expPlan, new ReverseDependencyOrderWalkerWOSeenChk(expPlan));
this.currentOp = currentOp;
this.pc = pc;
}
@Override
protected void execute(LogicalExpression op) throws FrontendException {
if (op instanceof UserFuncExpression) {
UserFuncExpression udf = (UserFuncExpression)op;
if (!udf.getEvalFunc().allowCompileTimeCalculation()) {
return;
}
}
boolean valSet = false;
Object val = null;
if (currentWalker.getPlan().getSuccessors(op) != null) {
// If has successors and all successors are constant, calculate the constant
for (Operator succ : currentWalker.getPlan().getSuccessors(op)) {
if (!(succ instanceof ConstantExpression)) {
return;
}
}
// All successors are constant, calculate the value
OperatorPlan expLogicalPlan = new LogicalExpressionPlan();
((BaseOperatorPlan)currentWalker.getPlan()).moveTree(op, (BaseOperatorPlan)expLogicalPlan);
PhysicalPlan expPhysicalPlan = new PhysicalPlan();
Map<Operator, PhysicalOperator> logToPhyMap = new HashMap<Operator, PhysicalOperator>();
PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(expLogicalPlan);
// Save the old walker and use childWalker as current Walker
pushWalker(childWalker);
ExpToPhyTranslationVisitor expTranslationVisitor = new
ExpToPhyTranslationVisitor(expLogicalPlan,
childWalker, currentOp, expPhysicalPlan, logToPhyMap);
expTranslationVisitor.visit();
popWalker();
PhysicalOperator root = expPhysicalPlan.getLeaves().get(0);
try {
UDFContext.getUDFContext().addJobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), true));
PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
PhysicalOperator.setPigLogger(pigHadoopLogger);
setDefaultTimeZone();
val = root.getNext(root.getResultType()).result;
restoreDefaultTimeZone();
UDFContext.getUDFContext().addJobConf(null);
} catch (ExecException e) {
throw new FrontendException(e);
}
valSet = true;
} else if (op instanceof UserFuncExpression) {
// If solo UDF, calculate UDF
UserFuncExpression udf = (UserFuncExpression)op;
try {
UDFContext.getUDFContext().addJobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), true));
setDefaultTimeZone();
val = udf.getEvalFunc().exec(null);
restoreDefaultTimeZone();
UDFContext.getUDFContext().addJobConf(null);
} catch (IOException e) {
throw new FrontendException(e);
}
valSet = true;
}
if (valSet) {
ConstantExpression constantExpr;
constantExpr = new ConstantExpression(currentWalker.getPlan(), val);
constantExpr.inheritSchema(op);
currentWalker.getPlan().replace(op, constantExpr);
}
}
private void setDefaultTimeZone() {
String dtzStr = pc.getProperties().getProperty("pig.datetime.default.tz");
if (dtzStr != null && dtzStr.length() > 0) {
currentDTZ = DateTimeZone.getDefault();
DateTimeZone.setDefault(DateTimeZone.forID(dtzStr));
}
}
private void restoreDefaultTimeZone() {
if (currentDTZ != null) {
DateTimeZone.setDefault(currentDTZ);
currentDTZ = null;
}
}
}
@Override
public void transform(OperatorPlan matched) throws FrontendException {
AllExpressionVisitor expressionVisitor = new ConstantCalculatorLogicalPlanVisitor(matched, pc);
expressionVisitor.visit();
}
@Override
public OperatorPlan reportChanges() {
return plan;
}
}
}