blob: 299c01272844084e2ca6c794952e3a07cf3ef9eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.expression;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Add;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryComparisonOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.BinaryExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Divide;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GTOrEqualToExpr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LTOrEqualToExpr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Mod;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Multiply;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.NotEqualToExpr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POIsNull;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONot;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORegexp;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
public class ExpToPhyTranslationVisitor extends LogicalExpressionVisitor {
// This value points to the current LogicalRelationalOperator we are working on
protected LogicalRelationalOperator currentOp;
public ExpToPhyTranslationVisitor(OperatorPlan plan, LogicalRelationalOperator op, PhysicalPlan phyPlan,
Map<Operator, PhysicalOperator> map) throws FrontendException {
this(plan, new DependencyOrderWalker(plan), op, phyPlan, map);
}
public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, LogicalRelationalOperator op, PhysicalPlan phyPlan,
Map<Operator, PhysicalOperator> map) throws FrontendException {
super(plan, walker);
currentOp = op;
logToPhyMap = map;
currentPlan = phyPlan;
currentPlans = new LinkedList<PhysicalPlan>();
}
protected Map<Operator, PhysicalOperator> logToPhyMap;
protected Deque<PhysicalPlan> currentPlans;
protected PhysicalPlan currentPlan;
protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
protected PigContext pc;
public void setPigContext(PigContext pc) {
this.pc = pc;
}
public PhysicalPlan getPhysicalPlan() {
return currentPlan;
}
private void attachBinaryComparisonOperator( BinaryExpression op,
BinaryComparisonOperator exprOp ) throws FrontendException {
// We dont have aliases in ExpressionOperators
// exprOp.setAlias(op.getAlias());
exprOp.setOperandType(op.getLhs().getType());
exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs()));
exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs()));
OperatorPlan oPlan = op.getPlan();
currentPlan.add(exprOp);
logToPhyMap.put(op, exprOp);
List<Operator> successors = oPlan.getSuccessors(op);
if (successors == null) {
return;
}
for (Operator lo : successors) {
PhysicalOperator from = logToPhyMap.get(lo);
try {
currentPlan.connect(from, exprOp);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
}
private void attachBinaryExpressionOperator( BinaryExpression op,
BinaryExpressionOperator exprOp ) throws FrontendException {
// We dont have aliases in ExpressionOperators
// exprOp.setAlias(op.getAlias());
exprOp.setResultType(op.getLhs().getType());
exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs()));
exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs()));
OperatorPlan oPlan = op.getPlan();
currentPlan.add(exprOp);
logToPhyMap.put(op, exprOp);
List<Operator> successors = oPlan.getSuccessors(op);
if (successors == null) {
return;
}
for (Operator lo : successors) {
PhysicalOperator from = logToPhyMap.get(lo);
try {
currentPlan.connect(from, exprOp);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
}
@Override
public void visit( AndExpression op ) throws FrontendException {
// System.err.println("Entering And");
BinaryComparisonOperator exprOp = new POAnd(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryComparisonOperator(op, exprOp);
}
@Override
public void visit( OrExpression op ) throws FrontendException {
// System.err.println("Entering Or");
BinaryComparisonOperator exprOp = new POOr(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryComparisonOperator(op, exprOp);
}
@Override
public void visit( EqualExpression op ) throws FrontendException {
BinaryComparisonOperator exprOp = new EqualToExpr(new OperatorKey(
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryComparisonOperator(op, exprOp);
}
@Override
public void visit( NotEqualExpression op ) throws FrontendException {
BinaryComparisonOperator exprOp = new NotEqualToExpr(new OperatorKey(
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryComparisonOperator(op, exprOp);
}
@Override
public void visit( GreaterThanExpression op ) throws FrontendException {
BinaryComparisonOperator exprOp = new GreaterThanExpr(new OperatorKey(
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryComparisonOperator(op, exprOp);
}
@Override
public void visit( GreaterThanEqualExpression op ) throws FrontendException {
BinaryComparisonOperator exprOp = new GTOrEqualToExpr(new OperatorKey(
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryComparisonOperator(op, exprOp);
}
@Override
public void visit( LessThanExpression op ) throws FrontendException {
BinaryComparisonOperator exprOp = new LessThanExpr(new OperatorKey(
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryComparisonOperator(op, exprOp);
}
@Override
public void visit( LessThanEqualExpression op ) throws FrontendException {
BinaryComparisonOperator exprOp = new LTOrEqualToExpr(new OperatorKey(
DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryComparisonOperator(op, exprOp);
}
@Override
public void visit(ProjectExpression op) throws FrontendException {
POProject exprOp;
if(op.getAttachedRelationalOp() instanceof LOGenerate && op.getPlan().getSuccessors(op)==null &&
!(op.findReferent() instanceof LOInnerLoad)) {
exprOp = new PORelationToExprProject(new OperatorKey(DEFAULT_SCOPE, nodeGen
.getNextNodeId(DEFAULT_SCOPE)));
} else {
exprOp = new POProject(new OperatorKey(DEFAULT_SCOPE, nodeGen
.getNextNodeId(DEFAULT_SCOPE)));
}
if (op.getFieldSchema()==null && op.isRangeOrStarProject())
exprOp.setResultType(DataType.TUPLE);
else
exprOp.setResultType(op.getType());
if(op.isProjectStar()){
exprOp.setStar(op.isProjectStar());
}
else if(op.isRangeProject()){
if(op.getEndCol() != -1){
//all other project-range should have been expanded by
// project-star expander
throw new AssertionError("project range that is not a " +
"project-to-end seen in translation to physical plan!");
}
exprOp.setProjectToEnd(op.getStartCol());
}else {
exprOp.setColumn(op.getColNum());
}
// TODO implement this
// exprOp.setOverloaded(op.getOverloaded());
logToPhyMap.put(op, exprOp);
currentPlan.add(exprOp);
}
@Override
public void visit( MapLookupExpression op ) throws FrontendException {
ExpressionOperator physOp = new POMapLookUp(new OperatorKey(DEFAULT_SCOPE,
nodeGen.getNextNodeId(DEFAULT_SCOPE)));
((POMapLookUp)physOp).setLookUpKey(op.getLookupKey() );
physOp.setResultType(op.getType());
physOp.addOriginalLocation(op.getFieldSchema().alias, op.getLocation());
currentPlan.add(physOp);
logToPhyMap.put(op, physOp);
ExpressionOperator from = (ExpressionOperator) logToPhyMap.get(op
.getMap());
try {
currentPlan.connect(from, physOp);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
@Override
public void visit(org.apache.pig.newplan.logical.expression.ConstantExpression op) throws FrontendException {
// System.err.println("Entering Constant");
ConstantExpression ce = new ConstantExpression(new OperatorKey(DEFAULT_SCOPE,
nodeGen.getNextNodeId(DEFAULT_SCOPE)));
// We dont have aliases in ExpressionOperators
// ce.setAlias(op.getAlias());
ce.setValue(op.getValue());
ce.setResultType(op.getType());
//this operator doesn't have any predecessors
currentPlan.add(ce);
logToPhyMap.put(op, ce);
// System.err.println("Exiting Constant");
}
@Override
public void visit( CastExpression op ) throws FrontendException {
POCast pCast = new POCast(new OperatorKey(DEFAULT_SCOPE, nodeGen
.getNextNodeId(DEFAULT_SCOPE)));
pCast.addOriginalLocation(op.getFieldSchema().alias, op.getLocation()) ;
// physOp.setAlias(op.getAlias());
currentPlan.add(pCast);
logToPhyMap.put(op, pCast);
ExpressionOperator from = (ExpressionOperator) logToPhyMap.get(op
.getExpression());
pCast.setResultType(op.getType());
pCast.setFieldSchema(new ResourceSchema.ResourceFieldSchema(op.getFieldSchema()));
FuncSpec lfSpec = op.getFuncSpec();
if(null != lfSpec) {
try {
pCast.setFuncSpec(lfSpec);
} catch (IOException e) {
int errCode = 1053;
String msg = "Cannot resolve load function to use for casting" +
" from " + DataType.findTypeName(op.getExpression().
getType()) + " to " + DataType.findTypeName(op.getType());
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
try {
currentPlan.connect(from, pCast);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
@Override
public void visit( NotExpression op ) throws FrontendException {
PONot pNot = new PONot(new OperatorKey(DEFAULT_SCOPE, nodeGen
.getNextNodeId(DEFAULT_SCOPE)));
// physOp.setAlias(op.getAlias());
currentPlan.add(pNot);
logToPhyMap.put(op, pNot);
ExpressionOperator from = (ExpressionOperator) logToPhyMap.get(op
.getExpression());
pNot.setExpr(from);
pNot.setResultType(op.getType());
pNot.setOperandType(op.getType());
try {
currentPlan.connect(from, pNot);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
@Override
public void visit( IsNullExpression op ) throws FrontendException {
POIsNull pIsNull = new POIsNull(new OperatorKey(DEFAULT_SCOPE, nodeGen
.getNextNodeId(DEFAULT_SCOPE)));
// physOp.setAlias(op.getAlias());
currentPlan.add(pIsNull);
logToPhyMap.put(op, pIsNull);
ExpressionOperator from = (ExpressionOperator) logToPhyMap.get(op
.getExpression());
pIsNull.setExpr(from);
pIsNull.setResultType(op.getType());
pIsNull.setOperandType(op.getExpression().getType());
try {
currentPlan.connect(from, pIsNull);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
@Override
public void visit( NegativeExpression op ) throws FrontendException {
PONegative pNegative = new PONegative(new OperatorKey(DEFAULT_SCOPE, nodeGen
.getNextNodeId(DEFAULT_SCOPE)));
// physOp.setAlias(op.getAlias());
currentPlan.add(pNegative);
logToPhyMap.put(op, pNegative);
ExpressionOperator from = (ExpressionOperator) logToPhyMap.get(op
.getExpression());
pNegative.setExpr(from);
pNegative.setResultType(op.getType());
try {
currentPlan.connect(from, pNegative);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
@Override
public void visit( AddExpression op ) throws FrontendException {
BinaryExpressionOperator exprOp = new Add(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryExpressionOperator(op, exprOp);
}
@Override
public void visit( RegexExpression op ) throws FrontendException {
BinaryExpressionOperator exprOp = new PORegexp(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryExpressionOperator(op, exprOp);
List<Operator> successors = op.getPlan().getSuccessors(op);
if (successors.get(1) instanceof org.apache.pig.newplan.logical.expression.ConstantExpression) {
((PORegexp)exprOp).setConstExpr(true);
}
}
@Override
public void visit( SubtractExpression op ) throws FrontendException {
BinaryExpressionOperator exprOp = new Subtract(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryExpressionOperator(op, exprOp);
}
@Override
public void visit( MultiplyExpression op ) throws FrontendException {
BinaryExpressionOperator exprOp = new Multiply(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryExpressionOperator(op, exprOp);
}
@Override
public void visit( DivideExpression op ) throws FrontendException {
BinaryExpressionOperator exprOp = new Divide(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryExpressionOperator(op, exprOp);
}
@Override
public void visit( ModExpression op ) throws FrontendException {
BinaryExpressionOperator exprOp = new Mod(new OperatorKey(DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
attachBinaryExpressionOperator(op, exprOp);
}
@Override
public void visit( BinCondExpression op ) throws FrontendException {
POBinCond exprOp = new POBinCond( new OperatorKey(DEFAULT_SCOPE,
nodeGen.getNextNodeId(DEFAULT_SCOPE)) );
exprOp.setResultType(op.getType());
exprOp.setCond((ExpressionOperator) logToPhyMap.get(op.getCondition()));
exprOp.setLhs((ExpressionOperator) logToPhyMap.get(op.getLhs()));
exprOp.setRhs((ExpressionOperator) logToPhyMap.get(op.getRhs()));
OperatorPlan oPlan = op.getPlan();
currentPlan.add(exprOp);
logToPhyMap.put(op, exprOp);
List<Operator> successors = oPlan.getSuccessors(op);
if (successors == null) {
return;
}
for (Operator lo : successors) {
PhysicalOperator from = logToPhyMap.get(lo);
try {
currentPlan.connect(from, exprOp);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
}
@SuppressWarnings("unchecked")
@Override
public void visit( UserFuncExpression op ) throws FrontendException {
Object f = PigContext.instantiateFuncFromSpec(op.getFuncSpec());
PhysicalOperator p;
if (f instanceof EvalFunc) {
p = new POUserFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen
.getNextNodeId(DEFAULT_SCOPE)), -1,
null, op.getFuncSpec(), (EvalFunc) f);
((POUserFunc)p).setSignature(op.getSignature());
//reinitialize input schema from signature
if (((POUserFunc)p).getFunc().getInputSchema() == null) {
((POUserFunc)p).setFuncInputSchema(op.getSignature());
((EvalFunc) f).setInputSchema(((POUserFunc)p).getFunc().getInputSchema());
}
List<String> cacheFiles = ((EvalFunc)f).getCacheFiles();
if (cacheFiles != null) {
((POUserFunc)p).setCacheFiles(cacheFiles);
}
List<String> shipFiles = ((EvalFunc)f).getShipFiles();
if (shipFiles != null) {
((POUserFunc)p).setShipFiles(shipFiles);
}
} else {
p = new POUserComparisonFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen
.getNextNodeId(DEFAULT_SCOPE)), -1,
null, op.getFuncSpec(), (ComparisonFunc) f);
}
p.setResultType(op.getType());
currentPlan.add(p);
List<LogicalExpression> fromList = op.getArguments();
if(fromList!=null){
for (LogicalExpression inputOperator : fromList) {
PhysicalOperator from = logToPhyMap.get(inputOperator);
try {
currentPlan.connect(from, p);
} catch (PlanException e) {
int errCode = 2015;
String msg = "Invalid physical operators in the physical plan" ;
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
}
}
logToPhyMap.put(op, p);
//We need to track all the scalars
if( op instanceof ScalarExpression ) {
Operator refOp = ((ScalarExpression)op).getImplicitReferencedOperator();
((POUserFunc)p).setReferencedOperator( logToPhyMap.get( refOp ) );
}
}
@Override
public void visit( DereferenceExpression op ) throws FrontendException {
POProject exprOp = new POProject(new OperatorKey(DEFAULT_SCOPE, nodeGen
.getNextNodeId(DEFAULT_SCOPE)));
exprOp.setResultType(op.getType());
exprOp.setColumns((ArrayList<Integer>)op.getBagColumns());
exprOp.setStar(false);
logToPhyMap.put(op, exprOp);
currentPlan.add(exprOp);
PhysicalOperator from = logToPhyMap.get( op.getReferredExpression() );
if( from != null ) {
currentPlan.connect(from, exprOp);
}
}
}