blob: db1ca48a6af6c1555921671d51e7b7c7068c9fb6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.visitor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.pig.EvalFunc;
import org.apache.pig.EvalFunc.SchemaType;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.logicalLayer.validators.TypeCheckerException;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.ReverseDependencyOrderWalker;
import org.apache.pig.newplan.logical.Util;
import org.apache.pig.newplan.logical.expression.AddExpression;
import org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.BinCondExpression;
import org.apache.pig.newplan.logical.expression.BinaryExpression;
import org.apache.pig.newplan.logical.expression.CastExpression;
import org.apache.pig.newplan.logical.expression.ConstantExpression;
import org.apache.pig.newplan.logical.expression.DereferenceExpression;
import org.apache.pig.newplan.logical.expression.DivideExpression;
import org.apache.pig.newplan.logical.expression.EqualExpression;
import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
import org.apache.pig.newplan.logical.expression.LessThanExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.MapLookupExpression;
import org.apache.pig.newplan.logical.expression.ModExpression;
import org.apache.pig.newplan.logical.expression.MultiplyExpression;
import org.apache.pig.newplan.logical.expression.NegativeExpression;
import org.apache.pig.newplan.logical.expression.NotEqualExpression;
import org.apache.pig.newplan.logical.expression.NotExpression;
import org.apache.pig.newplan.logical.expression.OrExpression;
import org.apache.pig.newplan.logical.expression.RegexExpression;
import org.apache.pig.newplan.logical.expression.SubtractExpression;
import org.apache.pig.newplan.logical.expression.UserFuncExpression;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
public class TypeCheckingExpVisitor extends LogicalExpressionVisitor{
private CompilationMessageCollector msgCollector;
private LogicalRelationalOperator currentRelOp;
private static final int INF = -1;
public TypeCheckingExpVisitor(
OperatorPlan expPlan,
CompilationMessageCollector msgCollector,
LogicalRelationalOperator relOp
)
throws FrontendException {
super(expPlan, new ReverseDependencyOrderWalker(expPlan));
this.msgCollector = msgCollector;
this.currentRelOp = relOp;
//reset field schema of all expression operators because
// it needs to be re-evaluated after correct types are set
FieldSchemaResetter sr = new FieldSchemaResetter(expPlan);
sr.visit();
}
@Override
public void visit(AddExpression binOp) throws FrontendException {
addCastsToNumericBinExpression(binOp);
}
@Override
public void visit(SubtractExpression binOp) throws FrontendException {
addCastsToNumericBinExpression(binOp);
}
@Override
public void visit(MultiplyExpression binOp) throws FrontendException {
addCastsToNumericBinExpression(binOp);
}
@Override
public void visit(DivideExpression binOp) throws FrontendException {
addCastsToNumericBinExpression(binOp);
}
/**
* Add casts to promote numeric type to larger of two input numeric types of
* the {@link BinaryExpression} binOp . If one of the inputs is numeric
* and other bytearray, cast the bytearray type to other numeric type.
* If both inputs are bytearray, cast them to double.
* @param binOp
* @throws FrontendException
*/
private void addCastsToNumericBinExpression(BinaryExpression binOp)
throws FrontendException {
LogicalExpression lhs = binOp.getLhs() ;
LogicalExpression rhs = binOp.getRhs() ;
byte lhsType = lhs.getType() ;
byte rhsType = rhs.getType() ;
if ( DataType.isNumberType(lhsType) &&
DataType.isNumberType(rhsType) ) {
// return the bigger type
byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
// Cast smaller type to the bigger type
if (lhsType != biggerType) {
insertCast(binOp, biggerType, binOp.getLhs());
}
else if (rhsType != biggerType) {
insertCast(binOp, biggerType, binOp.getRhs());
}
}
else if ( (lhsType == DataType.BYTEARRAY) &&
(DataType.isNumberType(rhsType)) ) {
insertCast(binOp, rhsType, binOp.getLhs());
}
else if ( (rhsType == DataType.BYTEARRAY) &&
(DataType.isNumberType(lhsType)) ) {
insertCast(binOp, lhsType, binOp.getRhs());
}
else if ( (lhsType == DataType.BYTEARRAY) &&
(rhsType == DataType.BYTEARRAY) ) {
// Cast both operands to double
insertCast(binOp, DataType.DOUBLE, binOp.getLhs());
insertCast(binOp, DataType.DOUBLE, binOp.getRhs());
}
else {
int errCode = 1039;
String msg = generateIncompatibleTypesMessage(binOp);
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(binOp, msg, errCode, PigException.INPUT) ;
}
}
@Override
public void visit(ModExpression binOp) throws FrontendException {
LogicalExpression lhs = binOp.getLhs() ;
LogicalExpression rhs = binOp.getRhs() ;
byte lhsType = lhs.getType() ;
byte rhsType = rhs.getType() ;
boolean error = false;
if (lhsType == DataType.INTEGER) {
if (rhsType == DataType.INTEGER) {
//do nothing
} else if (rhsType == DataType.LONG || rhsType == DataType.BIGINTEGER) {
insertCast(binOp, rhsType, binOp.getLhs());
} else {
error = true;
}
} else if (lhsType == DataType.LONG) {
if (rhsType == DataType.INTEGER) {
insertCast(binOp, lhsType, binOp.getRhs());
} else if (rhsType == DataType.BIGINTEGER) {
insertCast(binOp, rhsType, binOp.getLhs());
} else if (rhsType == DataType.LONG) {
//do nothing
} else {
error = true;
}
} else if (lhsType == DataType.BIGINTEGER) {
if (rhsType == DataType.INTEGER || rhsType == DataType.LONG) {
insertCast(binOp, lhsType, binOp.getRhs());
} else if (rhsType == DataType.BIGINTEGER) {
//do nothing
} else {
error = true;
}
} else if (lhsType == DataType.BYTEARRAY) {
if (rhsType == DataType.INTEGER || rhsType == DataType.LONG || rhsType == DataType.BIGINTEGER) {
insertCast(binOp, rhsType, binOp.getLhs());
} else {
error = true;
}
} else {
error = true;
}
if (error) {
int errCode = 1039;
String msg = generateIncompatibleTypesMessage(binOp);
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(binOp, msg, errCode, PigException.INPUT) ;
}
}
private String generateIncompatibleTypesMessage(BinaryExpression binOp)
throws FrontendException {
String msg = binOp.toString();
if (currentRelOp != null && currentRelOp.getAlias() != null) {
msg = "In alias " + currentRelOp.getAlias() + ", ";
}
LogicalFieldSchema lhsFs = binOp.getLhs().getFieldSchema();
LogicalFieldSchema rhsFs = binOp.getRhs().getFieldSchema();
msg = msg + "incompatible types in " + binOp.getName() + " Operator"
+ " left hand side:" + DataType.findTypeName(lhsFs.type)
+ (lhsFs.schema == null ? "" : " " + lhsFs.schema.toString(false) + " ")
+ " right hand side:" + DataType.findTypeName(rhsFs.type)
+ (rhsFs.schema == null ? "" : " " + rhsFs.schema.toString(false) + " ") ;
return msg;
}
@Override
public void visit(NegativeExpression negExp) throws FrontendException {
byte type = negExp.getExpression().getType() ;
if (DataType.isNumberType(type)) {
//do nothing
}
else if (type == DataType.BYTEARRAY) {
// cast bytearray to double
insertCast(negExp, DataType.DOUBLE, negExp.getExpression());
}
else {
int errCode = 1041;
String msg = "NEG can be used with numbers or Bytearray only" ;
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(negExp, msg, errCode, PigException.INPUT) ;
}
}
@Override
public void visit(NotExpression notExp) throws FrontendException {
if (notExp.getExpression() instanceof ConstantExpression
&& ((ConstantExpression) notExp.getExpression()).getValue() == null) {
insertCast(notExp, DataType.BOOLEAN, notExp.getExpression());
}
byte type = notExp.getExpression().getType();
if (type != DataType.BOOLEAN) {
int errCode = 1042;
String msg = "NOT can be used with boolean only" ;
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException( notExp, msg, errCode, PigException.INPUT) ;
}
}
@Override
public void visit(OrExpression orExp) throws FrontendException {
visitBooleanBinary(orExp);
}
@Override
public void visit(AndExpression andExp) throws FrontendException {
visitBooleanBinary(andExp);
}
private void visitBooleanBinary(BinaryExpression boolExp)
throws FrontendException {
// if lhs or rhs is null constant then cast it to boolean
insertCastsForNullToBoolean(boolExp);
LogicalExpression lhs = boolExp.getLhs();
LogicalExpression rhs = boolExp.getRhs();
byte lhsType = lhs.getType() ;
byte rhsType = rhs.getType() ;
if ( (lhsType != DataType.BOOLEAN) ||
(rhsType != DataType.BOOLEAN) ) {
int errCode = 1038;
String msg = "Operands of AND/OR can be boolean only" ;
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(boolExp, msg, errCode, PigException.INPUT) ;
}
}
@Override
public void visit(LessThanExpression binOp)
throws FrontendException {
addCastsToCompareBinaryExp(binOp, false /*not equality op*/);
}
@Override
public void visit(LessThanEqualExpression binOp)
throws FrontendException {
addCastsToCompareBinaryExp(binOp, false /*not equality op*/);
}
@Override
public void visit(GreaterThanExpression binOp)
throws FrontendException {
addCastsToCompareBinaryExp(binOp, false /*not equality op*/);
}
@Override
public void visit(GreaterThanEqualExpression binOp)
throws FrontendException {
addCastsToCompareBinaryExp(binOp, false /*not equality op*/);
}
@Override
public void visit(EqualExpression binOp)
throws FrontendException {
addCastsToCompareBinaryExp(binOp, true /*equality op*/);
}
@Override
public void visit(NotEqualExpression binOp)
throws FrontendException {
addCastsToCompareBinaryExp(binOp, true /*equality op*/);
}
private void addCastsToCompareBinaryExp(BinaryExpression binOp, boolean isEquality)
throws FrontendException {
LogicalExpression lhs = binOp.getLhs() ;
LogicalExpression rhs = binOp.getRhs() ;
byte lhsType = lhs.getType() ;
byte rhsType = rhs.getType() ;
if ( DataType.isNumberType(lhsType) &&
DataType.isNumberType(rhsType) ) {
// If not the same type, we cast them to the same
byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
// Cast smaller type to the bigger type
if (lhsType != biggerType) {
insertCast(binOp, biggerType, binOp.getLhs());
}
else if (rhsType != biggerType) {
insertCast(binOp, biggerType, binOp.getRhs());
}
}
else if ( (lhsType == DataType.DATETIME) &&
(rhsType == DataType.DATETIME) ) {
// good
}
else if ( (lhsType == DataType.CHARARRAY) &&
(rhsType == DataType.CHARARRAY) ) {
// good
}
else if ( (lhsType == DataType.BYTEARRAY) &&
(rhsType == DataType.BYTEARRAY) ) {
// good
}
else if ( (lhsType == DataType.BYTEARRAY) &&
( (rhsType == DataType.CHARARRAY) || (DataType.isNumberType(rhsType)) || (rhsType == DataType.BOOLEAN) || (rhsType == DataType.DATETIME))
) {
// Cast byte array to the type on rhs
insertCast(binOp, rhsType, binOp.getLhs());
}
else if ( (rhsType == DataType.BYTEARRAY) &&
( (lhsType == DataType.CHARARRAY) || (DataType.isNumberType(lhsType)) || (lhsType == DataType.BOOLEAN) || (lhsType == DataType.DATETIME))
) {
// Cast byte array to the type on lhs
insertCast(binOp, lhsType, binOp.getRhs());
}else if (isEquality){
//in case of equality condition, allow boolean, tuples and maps as args
if((lhsType == DataType.BOOLEAN) &&
(rhsType == DataType.BOOLEAN) ) {
// good
}
else if((lhsType == DataType.TUPLE) &&
(rhsType == DataType.TUPLE) ) {
// good
}
else if ( (lhsType == DataType.MAP) &&
(rhsType == DataType.MAP) ) {
// good
}
else if (lhsType == DataType.BYTEARRAY &&
(rhsType == DataType.MAP || rhsType == DataType.TUPLE)){
// Cast byte array to the type on lhs
insertCast(binOp, rhsType, binOp.getLhs());
}
else if(rhsType == DataType.BYTEARRAY &&
(lhsType == DataType.MAP || lhsType == DataType.TUPLE)){
// Cast byte array to the type on lhs
insertCast(binOp, lhsType, binOp.getRhs());
}
else {
throwIncompatibleTypeError(binOp);
}
}
else {
throwIncompatibleTypeError(binOp);
}
}
private void throwIncompatibleTypeError(BinaryExpression binOp)
throws FrontendException {
int errCode = 1039;
String msg = generateIncompatibleTypesMessage(binOp);
msgCollector.collect(msg, MessageType.Error) ;
throw new TypeCheckerException(binOp, msg, errCode, PigException.INPUT);
}
private void insertCastsForNullToBoolean(BinaryExpression binOp)
throws FrontendException {
if (binOp.getLhs() instanceof ConstantExpression
&& ((ConstantExpression) binOp.getLhs()).getValue() == null)
insertCast(binOp, DataType.BOOLEAN, binOp.getLhs());
if (binOp.getRhs() instanceof ConstantExpression
&& ((ConstantExpression) binOp.getRhs()).getValue() == null)
insertCast(binOp, DataType.BOOLEAN, binOp.getRhs());
}
/**
* add cast to convert the input of exp
* {@link LogicalExpression} arg to type toType
* @param exp
* @param toType
* @param arg
* @throws FrontendException
*/
private void insertCast(LogicalExpression exp, byte toType, LogicalExpression arg)
throws FrontendException {
LogicalFieldSchema toFs = new LogicalSchema.LogicalFieldSchema(null, null, toType);
insertCast(exp, toFs, arg);
}
private void insertCast(LogicalExpression node, LogicalFieldSchema toFs,
LogicalExpression arg)
throws FrontendException {
collectCastWarning(node, arg.getType(), toFs.type, msgCollector);
CastExpression cast = new CastExpression(plan, arg, toFs);
cast.setLocation(node.getLocation());
try {
// disconnect cast and arg because the connection is already
// added by cast constructor and insertBetween call is going
// to do it again
plan.disconnect(cast, arg);
plan.insertBetween(node, cast, arg);
}
catch (PlanException pe) {
int errCode = 2059;
String msg = "Problem with inserting cast operator for " + node + " in plan.";
throw new TypeCheckerException(arg, msg, errCode, PigException.BUG, pe);
}
this.visit(cast);
}
/**
* For Basic Types:
* 0) Casting to itself is always ok
* 1) Casting from number to number is always ok
* 2) ByteArray to anything is ok
* 3) number to chararray is ok
* For Composite Types:
* Recursively traverse the schemas till you get a basic type
* @throws FrontendException
*/
@Override
public void visit(CastExpression cast) throws FrontendException {
byte inType = cast.getExpression().getType();
byte outType = cast.getType();
if(outType == DataType.BYTEARRAY && inType != outType) {
int errCode = 1051;
String msg = "Cannot cast to bytearray";
msgCollector.collect(msg, MessageType.Error) ;
throw new TypeCheckerException(cast, msg, errCode, PigException.INPUT) ;
}
LogicalFieldSchema inFs = cast.getExpression().getFieldSchema();
LogicalFieldSchema outFs = cast.getFieldSchema();
if(inFs == null){
//replace null schema with bytearray schema.
inFs = new LogicalFieldSchema(null, null, DataType.BYTEARRAY);
}
//check if the field schemas are castable
boolean castable = LogicalFieldSchema.castable(inFs, outFs);
if(!castable) {
int errCode = 1052;
String msg = "Cannot cast "
+ DataType.findTypeName(inType)
+ ((DataType.isSchemaType(inType))? " with schema " + inFs.toString(false) : "")
+ " to "
+ DataType.findTypeName(outType)
+ ((DataType.isSchemaType(outType))? " with schema " + outFs.toString(false) : "");
msgCollector.collect(msg, MessageType.Error) ;
throw new TypeCheckerException(cast, msg, errCode, PigException.INPUT) ;
}
}
/**
* {@link RegexExpression} expects CharArray as input
* Itself always returns Boolean
* @param rg
* @throws FrontendException
*/
@Override
public void visit(RegexExpression rg) throws FrontendException {
// We allow BYTEARRAY to be converted to CHARARRAY
if (rg.getLhs().getType() == DataType.BYTEARRAY){
insertCast(rg, DataType.CHARARRAY, rg.getLhs());
}
if (rg.getRhs().getType() == DataType.BYTEARRAY){
insertCast(rg, DataType.CHARARRAY, rg.getRhs());
}
// Other than that if it's not CharArray just say goodbye
if (rg.getLhs().getType() != DataType.CHARARRAY ||
rg.getRhs().getType() != DataType.CHARARRAY)
{
int errCode = 1037;
String msg = "Operands of Regex can be CharArray only :" + rg;
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(rg, msg, errCode, PigException.INPUT) ;
}
}
@Override
public void visit(BinCondExpression binCond) throws FrontendException{
// high-level type checking
if (binCond.getCondition().getType() != DataType.BOOLEAN) {
int errCode = 1047;
String msg = "Condition in BinCond must be boolean" ;
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(binCond, msg, errCode, PigException.INPUT) ;
}
byte lhsType = binCond.getLhs().getType() ;
byte rhsType = binCond.getRhs().getType() ;
// If both sides are number, we can convert the smaller type to the bigger type
if (DataType.isNumberType(lhsType) && DataType.isNumberType(rhsType)) {
byte biggerType = lhsType > rhsType ? lhsType:rhsType ;
if (biggerType > lhsType) {
insertCast(binCond, biggerType, binCond.getLhs());
}
else if (biggerType > rhsType) {
insertCast(binCond, biggerType, binCond.getRhs());
}
}
else if ((lhsType == DataType.BYTEARRAY)
&& ((rhsType == DataType.CHARARRAY) || (DataType
.isNumberType(rhsType))) || (rhsType == DataType.DATETIME)) { // need to add boolean as well
// Cast byte array to the type on rhs
insertCast(binCond, rhsType, binCond.getLhs());
} else if ((rhsType == DataType.BYTEARRAY)
&& ((lhsType == DataType.CHARARRAY) || (DataType
.isNumberType(lhsType)) || (rhsType == DataType.DATETIME))) { // need to add boolean as well
// Cast byte array to the type on lhs
insertCast(binCond, lhsType, binCond.getRhs());
}
// A constant null is always bytearray - so cast it
// to rhs type
else if (binCond.getLhs() instanceof ConstantExpression
&& ((ConstantExpression) binCond.getLhs()).getValue() == null) {
try {
insertCast(binCond, binCond.getRhs().getFieldSchema(), binCond.getLhs());
} catch (FrontendException e) {
int errCode = 2216;
String msg = "Problem getting fieldSchema for " +binCond.getRhs();
throw new TypeCheckerException(binCond, msg, errCode, PigException.BUG, e);
}
} else if (binCond.getRhs() instanceof ConstantExpression
&& ((ConstantExpression) binCond.getRhs()).getValue() == null) {
try {
insertCast(binCond, binCond.getLhs().getFieldSchema(), binCond.getRhs());
} catch (FrontendException e) {
int errCode = 2216;
String msg = "Problem getting fieldSchema for " +binCond.getRhs();
throw new TypeCheckerException(binCond, msg, errCode, PigException.BUG, e);
}
} else if (lhsType == rhsType) {
// Matching schemas if we're working with tuples/bags
if (DataType.isSchemaType(lhsType)) {
try {
if(!LogicalFieldSchema.isEqualUnlessUnknown(binCond.getLhs().getFieldSchema(), binCond.getRhs().getFieldSchema())){
int errCode = 1048;
String msg = "Two inputs of BinCond must have compatible schemas."
+ " left hand side: " + binCond.getLhs().getFieldSchema()
+ " right hand side: " + binCond.getRhs().getFieldSchema();
msgCollector.collect(msg, MessageType.Error) ;
throw new TypeCheckerException(binCond, msg, errCode, PigException.INPUT) ;
}
// TODO: We may have to merge the schema here
// if the previous check is not exact match
}
catch (FrontendException fe) {
int errCode = 1049;
String msg = "Problem during evaluaton of BinCond output type" ;
msgCollector.collect(msg, MessageType.Error) ;
throw new TypeCheckerException(binCond, msg, errCode, PigException.INPUT, fe) ;
}
}
}
else {
int errCode = 1050;
String msg = "Unsupported input type for BinCond: left hand side: "
+ DataType.findTypeName(lhsType) + "; right hand side: "
+ DataType.findTypeName(rhsType);
msgCollector.collect(msg, MessageType.Error) ;
throw new TypeCheckerException(binCond, msg, errCode, PigException.INPUT) ;
}
}
@Override
public void visit(MapLookupExpression map)
throws FrontendException{
if(map.getMap().getType() != DataType.MAP) {
// insert cast if the predecessor does not
// return map
insertCast(map, DataType.MAP, map.getMap());
}
}
@Override
public void visit(DereferenceExpression deref) throws FrontendException{
byte inputType = deref.getReferredExpression().getType();
switch(inputType){
case DataType.TUPLE:
case DataType.BAG:
case DataType.BYTEARRAY: // ideally determine type at runtime
//allowed types
break;
default:
int errCode = 1129;
String msg = "Referring to column(s) within a column of type " +
DataType.findTypeName(inputType)
+ " is not allowed";
throw new TypeCheckerException(deref, msg, errCode, PigException.INPUT);
}
}
@Override
public void visit(UserFuncExpression func) throws FrontendException{
List<LogicalExpression> list = func.getArguments() ;
// If the dependency graph is right, all the inputs
// must already know the types
Schema currentArgSchema = new Schema();
for(LogicalExpression op: list) {
if (!DataType.isUsableType(op.getType())) {
int errCode = 1014;
String msg = "Problem with input " + op + " of User-defined function: " + func;
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(func, msg, errCode, PigException.INPUT) ;
}
try {
currentArgSchema.add(Util.translateFieldSchema(op.getFieldSchema()));
} catch (FrontendException e) {
int errCode = 1043;
String msg = "Unable to retrieve field schema.";
throw new TypeCheckerException(func, msg, errCode, PigException.INPUT, e);
}
}
EvalFunc<?> ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(func.getFuncSpec());
// ask the EvalFunc what types of inputs it can handle
List<FuncSpec> funcSpecs = null;
try {
funcSpecs = ef.getArgToFuncMapping();
if (funcSpecs!=null) {
for (FuncSpec funcSpec : funcSpecs) {
Schema s = funcSpec.getInputArgsSchema();
LogicalSchema ls = Util.translateSchema(s);
ls.normalize();
funcSpec.setInputArgsSchema(Util.translateSchema(ls));
}
}
} catch (Exception e) {
int errCode = 1044;
String msg = "Unable to get list of overloaded methods.";
throw new TypeCheckerException(func, msg, errCode, PigException.INPUT, e);
}
// EvalFunc's schema type
SchemaType udfSchemaType = ef.getSchemaType();
/**
* Here is an explanation of the way the matching UDF funcspec will be chosen
* based on actual types in the input schema.
* First an "exact" match is tried for each of the fields in the input schema
* with the corresponding fields in the candidate funcspecs' schemas.
*
* If exact match fails, then first a check if made if the input schema has any
* bytearrays in it.
*
* If there are NO bytearrays in the input schema, then a best fit match is attempted
* for the different fields. Essential a permissible cast from one type to another
* is given a "score" based on its position in the "castLookup" table. A final
* score for a candidate funcspec is deduced as
* SUM(score_of_particular_cast*noOfCastsSoFar).
* If no permissible casts are possible, the score for the candidate is -1. Among
* the non -1 score candidates, the candidate with the lowest score is chosen.
*
* If there are bytearrays in the input schema, a modified exact match is tried. In this
* matching, bytearrays in the input schema are not considered. As a result of
* ignoring the bytearrays, we could get multiple candidate funcspecs which match
* "exactly" for the other columns - if this is the case, we notify the user of
* the ambiguity and error out. Else if all other (non byte array) fields
* matched exactly, then we can cast bytearray(s) to the corresponding type(s)
* in the matched udf schema. If this modified exact match fails, the above best fit
* algorithm is attempted by initially coming up with scores and candidate funcSpecs
* (with bytearray(s) being ignored in the scoring process). Then a check is
* made to ensure that the positions which have bytearrays in the input schema
* have the same type (for a given position) in the corresponding positions in
* all the candidate funcSpecs. If this is not the case, it indicates a conflict
* and the user is notified of the error (because we have more than
* one choice for the destination type of the cast for the bytearray). If this is the case,
* the candidate with the lowest score is chosen.
*/
FuncSpec matchingSpec = null;
boolean notExactMatch = false;
if(funcSpecs!=null && funcSpecs.size()!=0){
//Some function mappings found. Trying to see
//if one of them fits the input schema
if((matchingSpec = exactMatch(funcSpecs, currentArgSchema, func, udfSchemaType))==null){
//Oops, no exact match found. Trying to see if we
//have mappings that we can fit using casts.
notExactMatch = true;
if(byteArrayFound(func, currentArgSchema)){
// try "exact" matching all other fields except the byte array
// fields and if they all exact match and we have only one candidate
// for the byte array cast then that's the matching one!
if((matchingSpec = exactMatchWithByteArrays(funcSpecs, currentArgSchema, func, udfSchemaType))==null){
// "exact" match with byte arrays did not work - try best fit match
if((matchingSpec = bestFitMatchWithByteArrays(funcSpecs, currentArgSchema, func, udfSchemaType)) == null) {
int errCode = 1045;
String msg = "Could not infer the matching function for "
+ func.getFuncSpec()
+ " as multiple or none of them fit. Please use an explicit cast.";
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(func, msg, errCode, PigException.INPUT);
}
}
} else if ((matchingSpec = bestFitMatch(funcSpecs, currentArgSchema, udfSchemaType)) == null) {
// Either no byte arrays found or there are byte arrays
// but only one mapping exists.
// However, we could not find a match as there were either
// none fitting the input schema or it was ambiguous.
// Throw exception that we can't infer a fit.
int errCode = 1045;
String msg = "Could not infer the matching function for "
+ func.getFuncSpec()
+ " as multiple or none of them fit. Please use an explicit cast.";
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(func, msg, errCode, PigException.INPUT);
}
}
}
if(matchingSpec!=null){
//Voila! We have a fitting match. Lets insert casts and make
//it work.
// notify the user about the match we picked if it was not
// an exact match
if(notExactMatch) {
String msg = "Function " + func.getFuncSpec().getClassName() + "()" +
" will be called with following argument types: " +
matchingSpec.getInputArgsSchema() + ". If you want to use " +
"different input argument types, please use explicit casts.";
msgCollector.collect(msg, MessageType.Warning, PigWarning.USING_OVERLOADED_FUNCTION);
}
if (func.isViaDefine()) {
matchingSpec.setCtorArgs(func.getFuncSpec().getCtorArgs());
}
func.setFuncSpec(matchingSpec);
insertCastsForUDF(func, currentArgSchema, matchingSpec.getInputArgsSchema(), udfSchemaType);
}
}
/**
* Tries to find the schema supported by one of funcSpecs which can be
* obtained by inserting a set of casts to the input schema
*
* @param funcSpecs -
* mappings provided by udf
* @param s -
* input schema
* @param func -
* udf expression
* @param udfSchemaType -
* schema type of the udf
* @return the funcSpec that supports the schema that is best suited to s.
* The best suited schema is one that has the lowest score as
* returned by fitPossible().
* @throws VisitorException
*/
private FuncSpec bestFitMatchWithByteArrays(List<FuncSpec> funcSpecs,
Schema s, UserFuncExpression func, SchemaType udfSchemaType) throws VisitorException {
List<Pair<Long, FuncSpec>> scoreFuncSpecList = new ArrayList<Pair<Long,FuncSpec>>();
for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); iterator
.hasNext();) {
FuncSpec fs = iterator.next();
long score = fitPossible(s, fs.getInputArgsSchema(), udfSchemaType);
if (score != INF) {
scoreFuncSpecList.add(new Pair<Long, FuncSpec>(score, fs));
}
}
// if no candidates found, return null
if(scoreFuncSpecList.size() == 0)
return null;
if(scoreFuncSpecList.size() > 1) {
// sort the candidates based on score
Collections.sort(scoreFuncSpecList, new ScoreFuncSpecListComparator());
// if there are two (or more) candidates with the same *lowest* score
// we cannot choose one of them - notify the user
if (scoreFuncSpecList.get(0).first == scoreFuncSpecList.get(1).first) {
int errCode = 1046;
String msg = "Multiple matching functions for "
+ func.getFuncSpec() + " with input schemas: " + "("
+ scoreFuncSpecList.get(0).second.getInputArgsSchema() + ", "
+ scoreFuncSpecList.get(1).second.getInputArgsSchema() + "). Please use an explicit cast.";
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(func, msg, errCode, PigException.INPUT);
}
// now consider the bytearray fields
List<Integer> byteArrayPositions = getByteArrayPositions(func, s);
// make sure there is only one type to "cast to" for the byte array
// positions among the candidate funcSpecs
Map<Integer, Pair<FuncSpec, Byte>> castToMap = new HashMap<Integer, Pair<FuncSpec, Byte>>();
for (Iterator<Pair<Long, FuncSpec>> it = scoreFuncSpecList.iterator(); it.hasNext();) {
FuncSpec funcSpec = it.next().second;
Schema sch = funcSpec.getInputArgsSchema();
for (Iterator<Integer> iter = byteArrayPositions.iterator(); iter
.hasNext();) {
Integer i = iter.next();
try {
if (!castToMap.containsKey(i)) {
// first candidate
castToMap.put(i, new Pair<FuncSpec, Byte>(funcSpec, sch
.getField(i).type));
} else {
// make sure the existing type from an earlier candidate
// matches
Pair<FuncSpec, Byte> existingPair = castToMap.get(i);
if (sch.getField(i).type != existingPair.second) {
int errCode = 1046;
String msg = "Multiple matching functions for "
+ func.getFuncSpec() + " with input schema: "
+ "(" + existingPair.first.getInputArgsSchema()
+ ", " + funcSpec.getInputArgsSchema()
+ "). Please use an explicit cast.";
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(func, msg, errCode, PigException.INPUT);
}
}
} catch (FrontendException fee) {
int errCode = 1043;
String msg = "Unalbe to retrieve field schema.";
throw new TypeCheckerException(func, msg, errCode, PigException.INPUT, fee);
}
}
}
}
// if we reached here, it means we have >= 1 candidates and these candidates
// have the same type for position which have bytearray in the input
// Also the candidates are stored sorted by score in a list - we can now
// just return the first candidate (the one with the lowest score)
return scoreFuncSpecList.get(0).second;
}
private static class ScoreFuncSpecListComparator implements Comparator<Pair<Long, FuncSpec>> {
/* (non-Javadoc)
* @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
*/
@Override
public int compare(Pair<Long, FuncSpec> o1, Pair<Long, FuncSpec> o2) {
if(o1.first < o2.first)
return -1;
else if (o1.first > o2.first)
return 1;
else
return 0;
}
}
/**
* Finds if there is an exact match between the schema supported by
* one of the funcSpecs and the input schema s. Here first exact match
* for all non byte array fields is first attempted and if there is
* exactly one candidate, it is chosen (since the bytearray(s) can
* just be cast to corresponding type(s) in the candidate)
* @param funcSpecs - mappings provided by udf
* @param s - input schema
* @param func - UserFuncExpression for which matching is requested
* @param udfSchemaType - schema type of the udf
* @return the matching spec if found else null
* @throws FrontendException
*/
private FuncSpec exactMatchWithByteArrays(List<FuncSpec> funcSpecs,
Schema s, UserFuncExpression func, SchemaType udfSchemaType) throws FrontendException {
// exact match all fields except byte array fields
// ignore byte array fields for matching
return exactMatchHelper(funcSpecs, s, func, udfSchemaType, true);
}
/**
* Finds if there is an exact match between the schema supported by
* one of the funcSpecs and the input schema s. Here an exact match
* for all fields is attempted.
* @param funcSpecs - mappings provided by udf
* @param s - input schema
* @param func - UserFuncExpression for which matching is requested
* @param udfSchemaType - schema type of the user defined function
* @return the matching spec if found else null
* @throws FrontendException
*/
private FuncSpec exactMatch(List<FuncSpec> funcSpecs, Schema s,
UserFuncExpression func, SchemaType udfSchemaType) throws FrontendException {
// exact match all fields, don't ignore byte array fields
return exactMatchHelper(funcSpecs, s, func, udfSchemaType, false);
}
/**
* Tries to find the schema supported by one of funcSpecs which can
* be obtained by inserting a set of casts to the input schema
* @param funcSpecs - mappings provided by udf
* @param s - input schema
* @param udfSchemaType - schema type of the udf
* @return the funcSpec that supports the schema that is best suited
* to s. The best suited schema is one that has the
* lowest score as returned by fitPossible().
*/
private FuncSpec bestFitMatch(List<FuncSpec> funcSpecs, Schema s, SchemaType udfSchemaType) {
FuncSpec matchingSpec = null;
long score = INF;
long prevBestScore = Long.MAX_VALUE;
long bestScore = Long.MAX_VALUE;
for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); iterator.hasNext();) {
FuncSpec fs = iterator.next();
score = fitPossible(s,fs.getInputArgsSchema(), udfSchemaType);
if(score!=INF && score<=bestScore){
matchingSpec = fs;
prevBestScore = bestScore;
bestScore = score;
}
}
if(matchingSpec!=null && bestScore!=prevBestScore)
return matchingSpec;
return null;
}
/**
* Checks to see if any field of the input schema is a byte array
* @param func
* @param s - input schema
* @return true if found else false
* @throws VisitorException
*/
private boolean byteArrayFound(UserFuncExpression func, Schema s) throws VisitorException {
for(int i=0;i<s.size();i++){
try {
FieldSchema fs=s.getField(i);
if(fs == null)
return false;
if(fs.type==DataType.BYTEARRAY){
return true;
}
} catch (FrontendException fee) {
int errCode = 1043;
String msg = "Unable to retrieve field schema.";
throw new TypeCheckerException(func, msg, errCode, PigException.INPUT, fee);
}
}
return false;
}
/**
* Gets the positions in the schema which are byte arrays
* @param func
*
* @param s -
* input schema
* @throws VisitorException
*/
private List<Integer> getByteArrayPositions(UserFuncExpression func, Schema s)
throws VisitorException {
List<Integer> result = new ArrayList<Integer>();
for (int i = 0; i < s.size(); i++) {
try {
FieldSchema fs = s.getField(i);
if (fs.type == DataType.BYTEARRAY) {
result.add(i);
}
} catch (FrontendException fee) {
int errCode = 1043;
String msg = "Unable to retrieve field schema.";
throw new TypeCheckerException(func, msg, errCode, PigException.INPUT, fee); }
}
return result;
}
/**
* Finds if there is an exact match between the schema supported by
* one of the funcSpecs and the input schema s
* @param funcSpecs - mappings provided by udf
* @param s - input schema
* @param func user defined function
* @param udfSchemaType - schema type of the user defined function
* @param ignoreByteArrays - flag for whether the exact match is to computed
* after ignoring bytearray (if true) or without ignoring bytearray (if false)
* @return the matching spec if found else null
* @throws FrontendException
*/
private FuncSpec exactMatchHelper(List<FuncSpec> funcSpecs, Schema s,
UserFuncExpression func, SchemaType udfSchemaType, boolean ignoreByteArrays)
throws FrontendException {
List<FuncSpec> matchingSpecs = new ArrayList<FuncSpec>();
for (Iterator<FuncSpec> iterator = funcSpecs.iterator(); iterator.hasNext();) {
FuncSpec fs = iterator.next();
if (schemaEqualsForMatching(s, fs.getInputArgsSchema(), udfSchemaType, ignoreByteArrays)) {
matchingSpecs.add(fs);
}
}
if(matchingSpecs.size() == 0)
return null;
if(matchingSpecs.size() > 1) {
int errCode = 1046;
String msg = "Multiple matching functions for "
+ func.getFuncSpec() + " with input schema: "
+ "(" + matchingSpecs.get(0).getInputArgsSchema()
+ ", " + matchingSpecs.get(1).getInputArgsSchema()
+ "). Please use an explicit cast.";
msgCollector.collect(msg, MessageType.Error);
throw new TypeCheckerException(func, msg, errCode, PigException.INPUT);
}
// exactly one matching spec - return it
return matchingSpecs.get(0);
}
/***************************************************************************
* Compare two schemas for equality for argument matching purposes. This is
* a more relaxed form of Schema.equals wherein first the Datatypes of the
* field schema are checked for equality. Then if a field schema in the udf
* schema is for a complex type AND if the inner schema is NOT null, check
* for schema equality of the inner schemas of the UDF field schema and
* input field schema
*
* @param inputSchema
* @param udfSchema
* @param ignoreByteArrays
* @return true if FieldSchemas are equal for argument matching, false
* otherwise
* @throws FrontendException
*/
public static boolean schemaEqualsForMatching(Schema inputSchema,
Schema udfSchema, SchemaType udfSchemaType, boolean ignoreByteArrays) throws FrontendException {
// If both of them are null, they are equal
if ((inputSchema == null) && (udfSchema == null)) {
return true;
}
// otherwise
if (inputSchema == null) {
return false;
}
if (udfSchema == null) {
return false;
}
// the old udf schemas might not have tuple inside bag
// fix that!
udfSchema = Util.fixSchemaAddTupleInBag(udfSchema);
if ((udfSchemaType == SchemaType.NORMAL) && (inputSchema.size() != udfSchema.size()))
return false;
if ((udfSchemaType == SchemaType.VARARG) && inputSchema.size() < udfSchema.size())
return false;
Iterator<FieldSchema> i = inputSchema.getFields().iterator();
Iterator<FieldSchema> j = udfSchema.getFields().iterator();
FieldSchema udfFieldSchema = null;
while (i.hasNext()) {
FieldSchema inputFieldSchema = i.next();
if(inputFieldSchema == null)
return false;
//if there's no more UDF field: take the last one which is the vararg field
udfFieldSchema = j.hasNext() ? j.next() : udfFieldSchema;
if(ignoreByteArrays && inputFieldSchema.type == DataType.BYTEARRAY) {
continue;
}
if (inputFieldSchema.type != udfFieldSchema.type) {
return false;
}
// if a field schema in the udf schema is for a complex
// type AND if the inner schema is NOT null, check for schema
// equality of the inner schemas of the UDF field schema and
// input field schema. If the field schema in the udf schema is
// for a complex type AND if the inner schema IS null it means
// the udf is applicable for all input which has the same type
// for that field (irrespective of inner schema)
// if it is a bag with empty tuple, then just rely on the field type
if (DataType.isSchemaType(udfFieldSchema.type)
&& udfFieldSchema.schema != null
&& isNotBagWithEmptyTuple(udfFieldSchema)
) {
// Compare recursively using field schema
if (!FieldSchema.equals(inputFieldSchema, udfFieldSchema,
false, true)) {
//try modifying any empty tuple to type of bytearray
// and see if that matches. Need to do this for
// backward compatibility -
// User might have specified tuple with a bytearray
// and this should also match an empty tuple
FieldSchema inputFSWithBytearrayinTuple =
new FieldSchema(inputFieldSchema);
convertEmptyTupleToBytearrayTuple(inputFSWithBytearrayinTuple);
if (!FieldSchema.equals(inputFSWithBytearrayinTuple, udfFieldSchema,
false, true)) {
return false;
}
}
}
}
return true;
}
/**
* Check if the fieldSch is a bag with empty tuple schema
* @param fieldSch
* @return
* @throws FrontendException
*/
private static boolean isNotBagWithEmptyTuple(FieldSchema fieldSch)
throws FrontendException {
boolean isBagWithEmptyTuple = false;
if(fieldSch.type == DataType.BAG
&& fieldSch.schema != null
&& fieldSch.schema.getField(0) != null
&& fieldSch.schema.getField(0).type == DataType.TUPLE
&& fieldSch.schema.getField(0).schema == null
){
isBagWithEmptyTuple = true;
}
return !isBagWithEmptyTuple;
}
private static void convertEmptyTupleToBytearrayTuple(
FieldSchema fs) {
if(fs.type == DataType.TUPLE
&& fs.schema != null
&& fs.schema.size() == 0){
fs.schema.add(new FieldSchema(null, DataType.BYTEARRAY));
return;
}
if(fs.schema != null){
for(FieldSchema inFs : fs.schema.getFields()){
convertEmptyTupleToBytearrayTuple(inFs);
}
}
}
static final HashMap<Byte, List<Byte>> castLookup = new HashMap<Byte, List<Byte>>();
static{
//Ordering here decides the score for the best fit function.
//Do not change the order. Conversions to a smaller type is preferred
//over conversion to a bigger type where ordering of types is:
//INTEGER, LONG, FLOAT, DOUBLE, DATETIME, CHARARRAY, TUPLE, BAG, MAP
//from small to big
List<Byte> boolToTypes = Arrays.asList(
DataType.INTEGER,
DataType.LONG,
DataType.FLOAT,
DataType.DOUBLE,
DataType.BIGINTEGER,
DataType.BIGDECIMAL
// maybe more bigger types
);
castLookup.put(DataType.BOOLEAN, boolToTypes);
List<Byte> intToTypes = Arrays.asList(
DataType.LONG,
DataType.FLOAT,
DataType.DOUBLE,
DataType.BIGINTEGER,
DataType.BIGDECIMAL
);
castLookup.put(DataType.INTEGER, intToTypes);
List<Byte> longToTypes = Arrays.asList(
DataType.FLOAT,
DataType.DOUBLE,
DataType.BIGINTEGER,
DataType.BIGDECIMAL
);
castLookup.put(DataType.LONG, longToTypes);
List<Byte> floatToTypes = Arrays.asList(
DataType.DOUBLE,
DataType.BIGINTEGER,
DataType.BIGDECIMAL
);
castLookup.put(DataType.FLOAT, floatToTypes);
List<Byte> doubleToTypes = Arrays.asList(
DataType.BIGINTEGER,
DataType.BIGDECIMAL
);
castLookup.put(DataType.DOUBLE, doubleToTypes);
List<Byte> bigIntegerToTypes = Arrays.asList(
DataType.BIGDECIMAL
);
castLookup.put(DataType.BIGINTEGER, bigIntegerToTypes);
List<Byte> byteArrayToTypes = Arrays.asList(
DataType.BOOLEAN,
DataType.INTEGER,
DataType.LONG,
DataType.FLOAT,
DataType.DOUBLE,
DataType.DATETIME,
DataType.CHARARRAY,
DataType.BIGINTEGER,
DataType.BIGDECIMAL,
DataType.TUPLE,
DataType.BAG,
DataType.MAP
);
castLookup.put(DataType.BYTEARRAY, byteArrayToTypes);
}
/**
* Computes a modified version of manhattan distance between
* the two schemas: s1 & s2. Here the value on the same axis
* are preferred over values that change axis as this means
* that the number of casts required will be lesser on the same
* axis.
*
* However, this function ceases to be a metric as the triangle
* inequality does not hold.
*
* Each schema is an s1.size() dimensional vector.
* The ordering for each axis is as defined by castLookup.
* Unallowed casts are returned a dist of INFINITY.
* @param s1
* @param s2
* @param s2Type
* @return
*/
private long fitPossible(Schema s1, Schema s2, SchemaType s2Type) {
if(s1==null || s2==null) return INF;
List<FieldSchema> sFields = s1.getFields();
List<FieldSchema> fsFields = s2.getFields();
if((s2Type == SchemaType.NORMAL) && (sFields.size()!=fsFields.size()))
return INF;
if((s2Type == SchemaType.VARARG) && (sFields.size() < fsFields.size()))
return INF;
long score = 0;
int castCnt=0;
for(int i=0;i<sFields.size();i++){
FieldSchema sFS = sFields.get(i);
if(sFS == null){
return INF;
}
// if we have a byte array do not include it
// in the computation of the score - bytearray
// fields will be looked at separately outside
// of this function
if (sFS.type == DataType.BYTEARRAY)
continue;
//if we get to the vararg field (if defined) : take it repeatedly
FieldSchema fsFS = ((s2Type == SchemaType.VARARG) && i >= s2.size()) ?
fsFields.get(s2.size() - 1) : fsFields.get(i);
if(DataType.isSchemaType(sFS.type)){
if(!FieldSchema.equals(sFS, fsFS, false, true))
return INF;
}
if(FieldSchema.equals(sFS, fsFS, true, true)) continue;
if(!castLookup.containsKey(sFS.type))
return INF;
if(!(castLookup.get(sFS.type).contains(fsFS.type)))
return INF;
score += (castLookup.get(sFS.type)).indexOf(fsFS.type) + 1;
++castCnt;
}
return score * castCnt;
}
private void insertCastsForUDF(UserFuncExpression func, Schema fromSch, Schema toSch, SchemaType toSchType)
throws FrontendException {
List<FieldSchema> fsLst = fromSch.getFields();
List<FieldSchema> tsLst = toSch.getFields();
List<LogicalExpression> args = func.getArguments();
int i=-1;
for (FieldSchema fFSch : fsLst) {
++i;
//if we get to the vararg field (if defined) : take it repeatedly
FieldSchema tFSch = ((toSchType == SchemaType.VARARG) && i >= tsLst.size()) ?
tsLst.get(tsLst.size() - 1) : tsLst.get(i);
if (fFSch.type == tFSch.type) {
continue;
}
insertCast(func, Util.translateFieldSchema(tFSch), args.get(i));
}
}
/***
* Helper for collecting warning when casting is inserted
* to the plan (implicit casting)
*
* @param node
* @param originalType
* @param toType
*/
static void collectCastWarning(Operator node,
byte originalType,
byte toType,
CompilationMessageCollector msgCollector
) {
String originalTypeName = DataType.findTypeName(originalType) ;
String toTypeName = DataType.findTypeName(toType) ;
String opName= node.getClass().getSimpleName() ;
PigWarning kind = null;
switch(toType) {
case DataType.BAG:
kind = PigWarning.IMPLICIT_CAST_TO_BAG;
break;
case DataType.CHARARRAY:
kind = PigWarning.IMPLICIT_CAST_TO_CHARARRAY;
break;
case DataType.DOUBLE:
kind = PigWarning.IMPLICIT_CAST_TO_DOUBLE;
break;
case DataType.FLOAT:
kind = PigWarning.IMPLICIT_CAST_TO_FLOAT;
break;
case DataType.INTEGER:
kind = PigWarning.IMPLICIT_CAST_TO_INT;
break;
case DataType.LONG:
kind = PigWarning.IMPLICIT_CAST_TO_LONG;
break;
case DataType.BOOLEAN:
kind = PigWarning.IMPLICIT_CAST_TO_BOOLEAN;
break;
case DataType.DATETIME:
kind = PigWarning.IMPLICIT_CAST_TO_DATETIME;
break;
case DataType.MAP:
kind = PigWarning.IMPLICIT_CAST_TO_MAP;
break;
case DataType.TUPLE:
kind = PigWarning.IMPLICIT_CAST_TO_TUPLE;
break;
case DataType.BIGINTEGER:
kind = PigWarning.IMPLICIT_CAST_TO_BIGINTEGER;
break;
case DataType.BIGDECIMAL:
kind = PigWarning.IMPLICIT_CAST_TO_BIGDECIMAL;
break;
}
msgCollector.collect(originalTypeName + " is implicitly cast to "
+ toTypeName +" under " + opName + " Operator",
MessageType.Warning, kind) ;
}
static class FieldSchemaResetter extends AllSameExpressionVisitor {
protected FieldSchemaResetter(OperatorPlan p) throws FrontendException {
super(p, new ReverseDependencyOrderWalker(p));
}
@Override
protected void execute(LogicalExpression op) throws FrontendException {
op.resetFieldSchema();
}
}
}