blob: d6c4cff4e00786b7c77404b327225cef5bb50cff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.visitor;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.pig.PigException;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.validators.TypeCheckerException;
import org.apache.pig.impl.plan.CompilationMessageCollector;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.expression.CastExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LORank;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
/*
All the getType() of these operators always return BAG.
We just have to :-
1) Check types of inputs, expression plans
2) Compute output schema with type information
(At the moment, the parser does only return GetSchema with correct aliases)
3) Insert casting if necessary
*/
public class TypeCheckingRelVisitor extends LogicalRelationalNodesVisitor {
private CompilationMessageCollector msgCollector;
public TypeCheckingRelVisitor(OperatorPlan plan, CompilationMessageCollector msgCollector)
throws FrontendException {
super(plan, new DependencyOrderWalker(plan));
this.msgCollector = msgCollector;
}
@Override
public void visit(LOLoad load){
// do nothing
}
@Override
public void visit(LOStore store)
throws FrontendException {
store.resetSchema();
store.getSchema();
}
/***
* The schema of filter output will be the same as filter input
* @throws FrontendException
*/
@Override
public void visit(LOFilter filter) throws FrontendException {
filter.resetSchema();
LogicalExpressionPlan comparisonPlan = filter.getFilterPlan() ;
// Check that the inner plan has only 1 output port
if (comparisonPlan.getSources().size() > 1) {
int errCode = 1057;
String msg = "Filter's cond plan can only have one output" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(filter, msg, errCode, PigException.INPUT, null) ;
}
// visit the filter expression
visitExpressionPlan(comparisonPlan, filter);
//check filter expression type
byte innerCondType = ((LogicalExpression)comparisonPlan.getSources().get(0)).getType();
if (innerCondType != DataType.BOOLEAN) {
int errCode = 1058;
String msg = "Filter's condition must evaluate to boolean. Found: " +
DataType.findTypeName(innerCondType);
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(filter, msg, errCode, PigException.INPUT, null) ;
}
try {
// re-compute the schema
filter.resetSchema();
filter.getSchema() ;
}
catch (FrontendException fe) {
int errCode = 1059;
String msg = "Problem while reconciling output schema of Filter" ;
msgCollector.collect(msg, MessageType.Error);
throwTypeCheckerException(filter, msg, errCode, PigException.INPUT, fe) ;
}
}
private void throwTypeCheckerException(Operator op, String msg,
int errCode, byte input, FrontendException fe) throws TypeCheckerException {
if( fe == null ) {
throw new TypeCheckerException(op, msg, errCode, PigException.INPUT);
}
throw new TypeCheckerException(op, msg, errCode, PigException.INPUT, fe);
}
@Override
public void visit(LOGenerate gen) throws FrontendException {
for(int i=0; i < gen.getOutputPlans().size(); i++) {
LogicalExpressionPlan expPlan = gen.getOutputPlans().get(i);
// Check that the inner plan has only 1 output port
if (expPlan.getSources().size() > 1) {
int errCode = 1057;
String msg = "LOGenerate expression plan can only have one output" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException( gen, msg, errCode, PigException.BUG, null) ;
}
// visit the filter expression
visitExpressionPlan( expPlan, gen );
}
gen.resetSchema();
gen.getSchema();
}
@Override
public void visit(LOInnerLoad innerLoad) throws FrontendException{
innerLoad.resetSchema();
innerLoad.getSchema();
}
@Override
public void visit(LOForEach forEach) throws FrontendException {
try {
// visit inner plan
new TypeCheckingRelVisitor( forEach.getInnerPlan(), msgCollector ).visit();
// re-compute the schema
forEach.resetSchema();
forEach.getSchema() ;
} catch (FrontendException fe) {
int errCode = 1059;
String msg = "Problem while reconciling output schema of ForEach" ;
msgCollector.collect(msg, MessageType.Error);
throwTypeCheckerException(forEach, msg, errCode, PigException.INPUT, fe) ;
}
}
private void visitExpressionPlan(LogicalExpressionPlan explPlan,
LogicalRelationalOperator relOp)
throws FrontendException {
TypeCheckingExpVisitor expTypeCheck =
new TypeCheckingExpVisitor(explPlan, msgCollector, relOp);
expTypeCheck.visit();
}
/* (non-Javadoc)
* @see org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor#visit(org.apache.pig.newplan.logical.relational.LOUnion)
* The output schema of LOUnion is the merge of all input schemas.
* Operands on left side always take precedance on aliases.
* We allow type promotion here
*/
@Override
public void visit(LOUnion u) throws FrontendException {
u.resetSchema();
// Have to make a copy, because as we insert operators, this list will
// change under us.
List<Operator> inputs = new ArrayList<Operator>(u.getInputs());
// There is no point to union only one operand
// it should be a problem in the parser
if (inputs.size() < 2) {
throw new AssertionError("Union with Count(Operand) < 2") ;
}
LogicalSchema schema = null ;
try {
// Compute the schema
schema = u.getSchema() ;
}
catch (FrontendException fee) {
int errCode = 1055;
String msg = "Problem while reading schemas from inputs of Union" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(u, msg, errCode, PigException.INPUT, fee) ;
}
// Do cast insertion only if we are typed
// and if its not union-onschema. In case of union-onschema the
// foreach with cast is added in UnionOnSchemaSetter
if (schema != null && !u.isOnSchema()) {
// Insert casting to inputs if necessary
for (int i=0; i< inputs.size() ;i++) {
LOForEach insertedOp
= insertCastForEachInBetweenIfNecessary((LogicalRelationalOperator)inputs.get(i), u) ;
// We may have to compute the schema of the input again
// because we have just inserted
if (insertedOp != null) {
if(insertedOp.getAlias()==null){
insertedOp.setAlias(((LogicalRelationalOperator)inputs.get(i)).getAlias());
}
try {
this.visit(insertedOp);
}
catch (FrontendException fee) {
int errCode = 1056;
String msg = "Problem while casting inputs of Union" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(u, msg, errCode, PigException.INPUT, fee) ;
}
}
}
}
u.resetSchema();
u.getSchema();
}
/***
* For casting insertion for relational operators
* only if it's necessary
* Currently this only does "shallow" casting
* @param fromOp
* @param toOp
* @return the inserted operator. null is no insertion
* @throws FrontendException
*/
private LOForEach insertCastForEachInBetweenIfNecessary(
LogicalRelationalOperator fromOp,
LogicalRelationalOperator toOp)
throws FrontendException {
// Make sure that they are adjacent and the direction
// is from "fromOp" to "toOp"
List<Operator> preList = plan.getPredecessors(toOp) ;
boolean found = false ;
for(Operator tmpOp: preList) {
// compare by reference
if (tmpOp == fromOp) {
found = true ;
break ;
}
}
if (!found) {
int errCode = 1077;
String msg = "Two operators that require a cast in between are not adjacent.";
throwTypeCheckerException(fromOp, msg, errCode, PigException.INPUT, null);
}
// retrieve input schema to be casted
// this will be used later
LogicalSchema fromSchema = null ;
LogicalSchema toSchema = null ;
try {
fromSchema = fromOp.getSchema() ;
toSchema = toOp.getSchema();
}
catch(FrontendException fe) {
int errCode = 1055;
String msg = "Problem while reading schema from input of "
+ fromOp.getClass().getSimpleName();
throwTypeCheckerException(fromOp, msg, errCode, PigException.BUG, fe);
}
// make sure the supplied targetSchema has the same number of members
// as number of output fields from "fromOp"
if (fromSchema.size() != toSchema.size()) {
int errCode = 1078;
String msg = "Schema size mismatch for casting. Input schema size: "
+ fromSchema.size() + ". Target schema size: " + toSchema.size();
throwTypeCheckerException(toOp, msg, errCode, PigException.INPUT, null);
}
// Plans inside Generate. Fields that do not need casting will only
// have Project. Fields that need casting will have Project + Cast
ArrayList<LogicalExpressionPlan> generatePlans = new ArrayList<LogicalExpressionPlan>() ;
LogicalPlan innerPlan = new LogicalPlan();
// create LOGenerate for foreach
LOGenerate loGen = new LOGenerate(innerPlan, generatePlans,
new boolean[toSchema.size()]);
innerPlan.add(loGen);
// Create ForEach to be inserted
LOForEach foreach = new LOForEach(plan);
foreach.setInnerPlan(innerPlan);
int castNeededCounter = 0 ;
for(int i=0;i < fromSchema.size(); i++) {
LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, i);
innerPlan.add(innerLoad);
innerPlan.connect(innerLoad, loGen);
LogicalExpressionPlan genPlan = new LogicalExpressionPlan() ;
ProjectExpression project = new ProjectExpression(genPlan, i, 0, loGen);
genPlan.add(project);
// add casting if necessary by comparing target types
// to the input schema
LogicalFieldSchema fs = null ;
fs = fromSchema.getField(i) ;
// This only does "shallow checking"
LogicalFieldSchema outFieldSchema ;
outFieldSchema = toSchema.getField(i) ;
if (outFieldSchema.type != fs.type) {
castNeededCounter++ ;
new CastExpression(genPlan, project, outFieldSchema);
}
generatePlans.add(genPlan) ;
}
// if we really need casting
if (castNeededCounter > 0) {
// Flatten List
// This is just cast insertion so we don't have any flatten
ArrayList<Boolean> flattenList = new ArrayList<Boolean>() ;
for(int i=0;i < toSchema.size(); i++) {
flattenList.add(Boolean.valueOf(false)) ;
}
// Manipulate the plan structure
plan.add(foreach);
plan.insertBetween(fromOp, foreach, toOp);
return foreach;
}
else {
plan.remove(foreach);
return null ;
}
}
@Override
public void visit(LOSplitOutput op) throws FrontendException {
op.resetSchema();
OperatorPlan lp = op.getPlan();
// LOSplitOutput can only have 1 input
List<Operator> list = lp.getPredecessors(op) ;
if (list.size() != 1) {
int errCode = 2008;
String msg = "LOSplitOutput cannot have more than one input. Found: " + list.size() + " input(s).";
throwTypeCheckerException(op, msg, errCode, PigException.BUG, null) ;
}
LogicalExpressionPlan condPlan = op.getFilterPlan() ;
// Check that the inner plan has only 1 output port
if (condPlan.getSources().size() != 1) {
int errCode = 1057;
String msg = "Split's inner plan can only have one output (leaf)" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(op, msg, errCode, PigException.INPUT, null) ;
}
visitExpressionPlan(condPlan, op);
byte innerCondType = ((LogicalExpression)condPlan.getSources().get(0)).getType() ;
if (innerCondType != DataType.BOOLEAN) {
int errCode = 1058;
String msg = "Split's condition must evaluate to boolean. Found: " + DataType.findTypeName(innerCondType) ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(op, msg, errCode, PigException.INPUT, null) ;
}
try {
// Compute the schema
op.getSchema() ;
}
catch (FrontendException fe) {
int errCode = 1055;
String msg = "Problem while reading"
+ " schemas from inputs of SplitOutput" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(op, msg, errCode, PigException.INPUT, fe) ;
}
}
/***
* LODistinct, output schema should be the same as input
* @param op
* @throws VisitorException
*/
@Override
public void visit(LODistinct op) throws VisitorException {
op.resetSchema();
try {
// Compute the schema
op.getSchema() ;
}
catch (FrontendException fe) {
int errCode = 1055;
String msg = "Problem while reading"
+ " schemas from inputs of Distinct" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(op, msg, errCode, PigException.INPUT, fe) ;
}
}
@Override
public void visit(LOLimit limit) throws FrontendException {
limit.resetSchema();
LogicalExpressionPlan expressionPlan = limit.getLimitPlan();
if (expressionPlan != null) {
// Check that the inner plan has only 1 output port
if (expressionPlan.getSources().size() > 1) {
int errCode = 1057;
String msg = "Limit's expression plan can only have one output";
msgCollector.collect(msg, MessageType.Error);
throwTypeCheckerException(limit, msg, errCode, PigException.INPUT, null);
}
// visit the limit expression
visitExpressionPlan(expressionPlan, limit);
// check limit expression type
byte innerCondType = ((LogicalExpression) expressionPlan.getSources().get(0))
.getType();
// cast to long if it is a bytearray
if (innerCondType == DataType.BYTEARRAY)
insertAtomicCastForInnerPlan(expressionPlan, limit, DataType.LONG);
// else it must be an int or a long
else if (innerCondType != DataType.LONG && innerCondType != DataType.INTEGER) {
int errCode = 1058;
String msg = "Limit's expression must evaluate to Long or Integer. Found: "
+ DataType.findTypeName(innerCondType);
msgCollector.collect(msg, MessageType.Error);
throwTypeCheckerException(limit, msg, errCode, PigException.INPUT, null);
}
}
try {
// Compute the schema
limit.getSchema();
} catch (FrontendException fe) {
int errCode = 1055;
String msg = "Problem while reading schemas from inputs of Limit";
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(limit, msg, errCode, PigException.INPUT, fe);
}
}
/***
* Return concatenated of all fields from all input operators
* If one of the inputs have no schema then we cannot construct
* the output schema.
* @param cs
* @throws VisitorException
*/
public void visit(LOCross cs) throws VisitorException {
cs.resetSchema();
try {
// Compute the schema
cs.getSchema() ;
}
catch (FrontendException fe) {
int errCode = 1055;
String msg = "Problem while reading"
+ " schemas from inputs of Cross" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(cs, msg, errCode, PigException.INPUT, fe) ;
}
}
/***
* The schema of sort output will be the same as sort input.
* @throws FrontendException
*
*/
public void visit(LOSort sort) throws FrontendException {
sort.resetSchema();
// Type checking internal plans.
for(int i=0;i < sort.getSortColPlans().size(); i++) {
LogicalExpressionPlan sortColPlan = sort.getSortColPlans().get(i) ;
// Check that the inner plan has only 1 output port
if (sortColPlan.getSources().size() != 1) {
int errCode = 1057;
String msg = "Sort's inner plan can only have one output (leaf)" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(sort, msg, errCode, PigException.INPUT, null) ;
}
visitExpressionPlan(sortColPlan, sort);
}
try {
// Compute the schema
sort.getSchema() ;
}
catch (FrontendException fee) {
int errCode = 1059;
String msg = "Problem while reconciling output schema of Sort" ;
msgCollector.collect(msg, MessageType.Error);
throwTypeCheckerException(sort, msg, errCode, PigException.INPUT, fee) ;
}
}
/***
* The schema of rank output will be the same as input, plus a rank field.
* @throws FrontendException
*
*/
public void visit(LORank rank) throws FrontendException {
rank.resetSchema();
// Type checking internal plans.
List<LogicalExpressionPlan> rankColPlans = rank.getRankColPlans();
for(int i=0;i < rankColPlans.size(); i++) {
LogicalExpressionPlan rankColPlan = rankColPlans.get(i) ;
// Check that the inner plan has only 1 output port
if (rankColPlan.getSources().size() != 1) {
int errCode = 1057;
String msg = "Rank's inner plan can only have one output (leaf)" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(rank, msg, errCode, PigException.INPUT, null) ;
}
visitExpressionPlan(rankColPlan, rank);
}
try {
// Compute the schema
rank.getSchema() ;
}
catch (FrontendException fee) {
int errCode = 1059;
String msg = "Problem while reconciling output schema of Rank" ;
msgCollector.collect(msg, MessageType.Error);
throwTypeCheckerException(rank, msg, errCode, PigException.INPUT, fee) ;
}
}
/***
* The schema of split output will be the same as split input
*/
public void visit(LOSplit split) throws VisitorException {
OperatorPlan lp = split.getPlan();
List<Operator> inputList = lp.getPredecessors(split);
if (inputList.size() != 1) {
int errCode = 2008;
String msg = "LOSplit cannot have more than one input. Found: " + inputList.size() + " input(s).";
throwTypeCheckerException(split, msg, errCode, PigException.BUG, null) ;
}
split.resetSchema();
try {
// Compute the schema
split.getSchema();
}
catch (FrontendException fe) {
int errCode = 1059;
String msg = "Problem while reconciling output schema of Split" ;
msgCollector.collect(msg, MessageType.Error);
throwTypeCheckerException(split, msg, errCode, PigException.INPUT, fe) ;
}
}
/**
* LOJoin visitor
* @throws FrontendException
*/
public void visit(LOJoin join) throws FrontendException {
try {
join.resetSchema();
join.getSchema();
} catch (FrontendException fe) {
int errCode = 1060;
String msg = "Cannot resolve Join output schema" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(join, msg, errCode, PigException.INPUT, fe) ;
}
MultiMap<Integer, LogicalExpressionPlan> joinColPlans
= join.getExpressionPlans() ;
List<Operator> inputs = join.getInputs((LogicalPlan) plan) ;
// Type checking internal plans.
for(int i=0;i < inputs.size(); i++) {
ArrayList<LogicalExpressionPlan> innerPlans
= new ArrayList<LogicalExpressionPlan>(joinColPlans.get(i)) ;
for(int j=0; j < innerPlans.size(); j++) {
LogicalExpressionPlan innerPlan = innerPlans.get(j) ;
// Check that the inner plan has only 1 output port
if (innerPlan.getSources().size() != 1) {
int errCode = 1057;
String msg = "Join's inner plans can only"
+ " have one output (leaf)" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(join, msg, errCode, PigException.INPUT, null) ;
}
visitExpressionPlan(innerPlan, join);
}
}
try {
if (!isJoinOnMultiCols(join)) {
// merge all the inner plan outputs so we know what type
// our group column should be
byte groupType = getAtomicJoinColType(join);
// go through all inputs again to add cast if necessary
for(int i=0;i < inputs.size(); i++) {
Collection<LogicalExpressionPlan> exprPlans = join.getJoinPlan(i);
//there should be one and only expression plan - that gets
// checked in getAtomicJoinColType()
LogicalExpressionPlan exprPlan = exprPlans.iterator().next();
// Checking innerPlan size already done above
byte innerType =
((LogicalExpression)exprPlan.getSources().get(0)).getType();
if (innerType != groupType) {
insertAtomicCastForInnerPlan(exprPlan, join, groupType);
}
}
}
else {
//schema of the group-by key
LogicalSchema groupBySchema = getSchemaFromInnerPlans(join.getExpressionPlans(), join) ;
// go through all inputs again to add cast if necessary
for(int i=0;i < inputs.size(); i++) {
List<LogicalExpressionPlan> innerPlans =
new ArrayList<LogicalExpressionPlan>(join.getJoinPlan(i)) ;
for(int j=0;j < innerPlans.size(); j++) {
LogicalExpressionPlan innerPlan = innerPlans.get(j) ;
LogicalExpression outputExp = ((LogicalExpression)innerPlan.getSources().get(0));
byte innerType = outputExp.getType() ;
byte expectedType = groupBySchema.getField(j).type ;
if (!DataType.isAtomic(innerType) && (DataType.TUPLE != innerType)) {
int errCode = 1057;
String msg = "Join's inner plans can only"
+ "have one output (leaf)" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(join, msg, errCode, PigException.INPUT, null) ;
}
if (innerType != expectedType) {
insertAtomicCastForInnerPlan(
innerPlan,join, expectedType
) ;
}
}
}
}
}
catch (FrontendException fe) {
int errCode = 1060;
String msg = "Cannot resolve Join output schema" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(join, msg, errCode, PigException.INPUT, fe) ;
}
try {
join.resetSchema();
join.getSchema();
}
catch (FrontendException fe) {
int errCode = 1060;
String msg = "Cannot resolve Join output schema" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(join, msg, errCode, PigException.INPUT, fe) ;
}
}
/**
* @param join
* @return true if there is more than one join column for an input
*/
private boolean isJoinOnMultiCols(LOJoin join) {
MultiMap<Integer, LogicalExpressionPlan> exprPlans = join.getExpressionPlans();
if(exprPlans == null || exprPlans.size() == 0){
throw new AssertionError("LOJoin.isJoinOnMultiCols() can only be called "
+ " after it has an join expression plans ") ;
}
return exprPlans.get(0).size() > 1;
}
/**
* This can be used to get the merged type of output join col
* only when the join col is of atomic type
* @return The type of the join col
* @throws FrontendException
*/
private byte getAtomicJoinColType(LOJoin join) throws FrontendException {
if (isJoinOnMultiCols(join)) {
int errCode = 1010;
String msg = "getAtomicJoinColType is used only when"
+ " dealing with atomic group col";
throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
}
byte groupType = DataType.BYTEARRAY ;
// merge all the inner plan outputs so we know what type
// our group column should be
for(int i=0;i < plan.getPredecessors(join).size() ; i++) {
List<LogicalExpressionPlan> innerPlans =
new ArrayList<LogicalExpressionPlan>(join.getJoinPlan(i)) ;
if (innerPlans.size() != 1) {
int errCode = 1012;
String msg = "Each COGroup input has to have "
+ "the same number of inner plans";
throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
}
byte innerType = ((LogicalExpression)innerPlans.get(0).getSources().get(0)).getType() ;
byte newGroupType = DataType.mergeType(groupType, innerType) ;
if (newGroupType == -1)
{
int errCode = 1107;
String msg = "Cannot merge join keys, incompatible types. Outer: " + DataType.findTypeName(groupType) + "; inner: " + DataType.findTypeName(innerType);
throw new FrontendException(msg, errCode, PigException.INPUT) ;
} else {
groupType = newGroupType;
}
}
return groupType ;
}
/**
* This can be used to get the merged type of output join col
* only when the join/cogroup col is of atomic type
* @return The type of the join col
* @throws FrontendException
*/
private byte getAtomicColType(MultiMap<Integer, LogicalExpressionPlan> allExprPlans) throws FrontendException {
if (isMultiExprPlanPerInput(allExprPlans)) {
int errCode = 1010;
String msg = "getAtomicJoinColType is used only when"
+ " dealing with atomic group col";
throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
}
byte groupType = DataType.BYTEARRAY ;
// merge all the inner plan outputs so we know what type
// our group column should be
for(int i=0;i < allExprPlans.size() ; i++) {
List<LogicalExpressionPlan> innerPlans =
new ArrayList<LogicalExpressionPlan>(allExprPlans.get(i)) ;
if (innerPlans.size() != 1) {
int errCode = 1012;
String msg = "Each COGroup input has to have "
+ "the same number of inner plans";
throw new FrontendException(msg, errCode, PigException.INPUT, false, null) ;
}
byte innerType = ((LogicalExpression)innerPlans.get(0).getSources().get(0)).getType() ;
byte newGroupType = DataType.mergeType(groupType, innerType) ;
if (newGroupType == -1)
{
int errCode = 1107;
String msg = "Cannot merge join keys, incompatible types. Outer: " + DataType.findTypeName(groupType) + "; inner: " + DataType.findTypeName(innerType);
throw new FrontendException(msg, errCode, PigException.INPUT) ;
} else {
groupType = newGroupType;
}
}
return groupType ;
}
private boolean isMultiExprPlanPerInput(
MultiMap<Integer, LogicalExpressionPlan> exprPlans) {
if(exprPlans == null || exprPlans.size() == 0){
throw new AssertionError("LOJoin.isJoinOnMultiCols() can only be called "
+ " after it has an join expression plans ") ;
}
return exprPlans.get(0).size() > 1;
}
/**
* Cast the single output operator of innerPlan to toType
* @param innerPlan
* @param relOp - join or cogroup
* @param toType
* @throws FrontendException
*/
private void insertAtomicCastForInnerPlan(LogicalExpressionPlan innerPlan,
LogicalRelationalOperator relOp, byte toType) throws FrontendException {
if (!DataType.isUsableType(toType)) {
int errCode = 1051;
String msg = "Cannot cast to "
+ DataType.findTypeName(toType);
throwTypeCheckerException(relOp, msg, errCode, PigException.INPUT, null);
}
List<Operator> outputs = innerPlan.getSources();
if (outputs.size() > 1) {
int errCode = 2060;
String msg = "Expected one output. Found " + outputs.size() + " outputs.";
throwTypeCheckerException(relOp, msg, errCode, PigException.BUG, null);
}
LogicalExpression currentOutput = (LogicalExpression) outputs.get(0);
TypeCheckingExpVisitor.collectCastWarning(
relOp, currentOutput.getType(),
toType, msgCollector
);
LogicalFieldSchema newFS = new LogicalFieldSchema(
currentOutput.getFieldSchema().alias, null, toType
);
//add cast
new CastExpression(innerPlan, currentOutput, newFS);
//visit modified inner plan
visitExpressionPlan(innerPlan, relOp);
}
/**
* Create combined group-by/join column schema based on join/cogroup
* expression plans for all inputs.
* This implementation is based on the assumption that all the
* inputs have the same join col tuple arity.
*
* @param exprPlans
* @return
* @throws FrontendException
*/
private LogicalSchema getSchemaFromInnerPlans(
MultiMap<Integer, LogicalExpressionPlan> exprPlans,
LogicalRelationalOperator op
)
throws FrontendException {
// this fsList represents all the columns in group tuple
List<LogicalFieldSchema> fsList = new ArrayList<LogicalFieldSchema>() ;
int outputSchemaSize = exprPlans.get(0).size();
// by default, they are all bytearray
// for type checking, we don't care about aliases
for(int i=0; i<outputSchemaSize; i++) {
fsList.add(new LogicalFieldSchema(null, null, DataType.BYTEARRAY));
}
// merge all the inner plan outputs so we know what type
// our group column should be
for(int i=0;i < exprPlans.size(); i++) {
List<LogicalExpressionPlan> innerPlans =
new ArrayList<LogicalExpressionPlan>(exprPlans.get(i)) ;
for(int j=0;j < innerPlans.size(); j++) {
LogicalExpression eOp = (LogicalExpression)innerPlans.get(j).getSources().get(0);
byte innerType = eOp.getType();
if(eOp instanceof ProjectExpression) {
if(((ProjectExpression)eOp).isProjectStar()) {
//there is a project star and there is more than one
// expression plan
int errCode = 1013;
String msg = "Grouping attributes can either be star (*) " +
"or a list of expressions, but not both.";
msgCollector.collect(msg, MessageType.Error) ;
throw new FrontendException(
msg, errCode, PigException.INPUT, false, null
);
}
}
//merge the type
LogicalFieldSchema groupFs = fsList.get(j);
groupFs.type = DataType.mergeType(groupFs.type, innerType) ;
if(groupFs.type == DataType.ERROR){
String colType = "join";
if(op instanceof LOCogroup){
colType = "group";
}
String msg =
colType + " column no. " +
(j+1) + " in relation no. " + (i+1) + " of " + colType +
" statement has datatype " + DataType.findTypeName(innerType) +
" which is incompatible with type of corresponding column" +
" in earlier relation(s) in the statement";
msgCollector.collect(msg, MessageType.Error) ;
TypeCheckerException ex =
new TypeCheckerException(op, msg, 1130, PigException.INPUT);
ex.setMarkedAsShowToUser(true);
throw ex;
}
}
}
//create schema from field schemas
LogicalSchema tupleSchema = new LogicalSchema();
for(LogicalFieldSchema fs : fsList){
tupleSchema.addField(fs);
}
return tupleSchema;
}
/**
* COGroup
* All group by cols from all inputs have to be of the
* same type
* @throws FrontendException
*/
@Override
public void visit(LOCogroup cg) throws FrontendException {
try {
cg.resetSchema();
cg.getSchema();
} catch (FrontendException fe) {
int errCode = 1060;
String msg = "Cannot resolve COGroup output schema" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(cg, msg, errCode, PigException.INPUT, fe) ;
}
MultiMap<Integer, LogicalExpressionPlan> groupByPlans =
cg.getExpressionPlans();
List<Operator> inputs = cg.getInputs((LogicalPlan)plan);
// Type checking internal plans.
for(int i=0;i < inputs.size(); i++) {
List<LogicalExpressionPlan> innerPlans =
new ArrayList<LogicalExpressionPlan>(cg.getExpressionPlans().get(i)) ;
for(int j=0; j < innerPlans.size(); j++) {
LogicalExpressionPlan innerPlan = innerPlans.get(j) ;
// Check that the inner plan has only 1 output port
if (innerPlan.getSources().size() != 1) {
int errCode = 1057;
String msg = "COGroup's inner plans can only"
+ "have one output (leaf)" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(cg, msg, errCode, PigException.INPUT, null) ;
}
visitExpressionPlan(innerPlan, cg);
}
}
try {
if (!isCoGroupOnMultiCols(cg)) {
// merge all the inner plan outputs so we know what type
// our group column should be
byte groupType = getAtomicColType(cg.getExpressionPlans());
// go through all inputs again to add cast if necessary
for(int i=0;i < inputs.size(); i++) {
List<LogicalExpressionPlan> innerPlans =
new ArrayList<LogicalExpressionPlan>(cg.getExpressionPlans().get(i)) ;
// Checking innerPlan size already done above
byte innerType = ((LogicalExpression)innerPlans.get(0).getSources().get(0)).getType() ;
if (innerType != groupType) {
insertAtomicCastForInnerPlan(
innerPlans.get(0),cg, groupType
) ;
}
}
}
else {
LogicalSchema groupBySchema = getSchemaFromInnerPlans(cg.getExpressionPlans(), cg);
// go through all inputs again to add cast if necessary
for(int i=0;i < inputs.size(); i++) {
List<LogicalExpressionPlan> innerPlans =
new ArrayList<LogicalExpressionPlan>(groupByPlans.get(i)) ;
for(int j=0;j < innerPlans.size(); j++) {
LogicalExpressionPlan innerPlan = innerPlans.get(j) ;
byte innerType = ((LogicalExpression)innerPlan.getSources().get(0)).getType() ;
byte expectedType = DataType.BYTEARRAY ;
if (!DataType.isAtomic(innerType) && (DataType.TUPLE != innerType)) {
int errCode = 1061;
String msg = "Sorry, group by complex types"
+ " will be supported soon" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(cg, msg, errCode, PigException.INPUT, null) ;
}
expectedType = groupBySchema.getField(j).type ;
if (innerType != expectedType) {
insertAtomicCastForInnerPlan(
innerPlan, cg, expectedType
);
}
}
}
}
}
catch (FrontendException fe) {
int errCode = 1060;
String msg = "Cannot resolve COGroup output schema" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(cg, msg, errCode, PigException.INPUT, fe) ;
}
try {
cg.resetSchema();
cg.getSchema();
}
catch (FrontendException fe) {
int errCode = 1060;
String msg = "Cannot resolve COGroup output schema" ;
msgCollector.collect(msg, MessageType.Error) ;
throwTypeCheckerException(cg, msg, errCode, PigException.INPUT, fe) ;
}
}
/**
* @param coGroup
* @return true if there is more than one join column for an input
*/
private boolean isCoGroupOnMultiCols(LOCogroup coGroup) {
MultiMap<Integer, LogicalExpressionPlan> exprPlans = coGroup.getExpressionPlans();
if(exprPlans == null || exprPlans.size() == 0){
throw new AssertionError("LOCoGroup.isJoinOnMultiCols() can only becalled "
+ " after it has an join expression plans ") ;
}
return exprPlans.get(0).size() > 1;
}
}