PIG-5020: Give file location for loadcaster related warning and errors (knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1763307 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index e7d6c0f..6112aec 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -24,6 +24,8 @@
  
 IMPROVEMENTS
 
+PIG-5020: Give file location for loadcaster related warning and errors (knoguchi)
+
 PIG-5027: Improve SAMPLE Scalar Expression Example (icook via knoguchi)
 
 PIG-5023: Documentation for BagToTuple (icook via knoguchi)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
index 94c64ca..02ecf79 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java
@@ -165,7 +165,7 @@
                         res.result = caster.bytesToBigInteger(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "BigInteger.";
+                        String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -281,7 +281,7 @@
                         res.result = caster.bytesToBigDecimal(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "BigDecimal.";
+                        String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -396,7 +396,7 @@
                         res.result = caster.bytesToBoolean(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "boolean.";
+                        String msg = unknownByteArrayErrorMessage + "boolean for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -510,7 +510,7 @@
                         res.result = caster.bytesToInteger(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "int.";
+                        String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -636,7 +636,7 @@
                         res.result = caster.bytesToLong(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "long.";
+                        String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -759,7 +759,7 @@
                         res.result = caster.bytesToDouble(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "double.";
+                        String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -881,7 +881,7 @@
                         res.result = caster.bytesToFloat(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "float.";
+                        String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1007,7 +1007,7 @@
                         res.result = caster.bytesToDateTime(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "datetime.";
+                        String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1118,7 +1118,7 @@
                         res.result = caster.bytesToCharArray(dba.get());
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "string.";
+                        String msg = unknownByteArrayErrorMessage + "string for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1270,7 +1270,7 @@
                         res.result = caster.bytesToTuple(dba.get(), fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "tuple.";
+                        String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1332,7 +1332,7 @@
                     result = caster.bytesToBag(((DataByteArray)obj).get(), fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "bag.";
+                    String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1363,7 +1363,7 @@
                     result = caster.bytesToTuple(((DataByteArray)obj).get(), fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "tuple.";
+                    String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1388,7 +1388,7 @@
                     result = caster.bytesToMap(((DataByteArray)obj).get(), fs);
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "tuple.";
+                    String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
             } else {
@@ -1402,7 +1402,7 @@
                     result = caster.bytesToBoolean(((DataByteArray) obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "int.";
+                    String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1441,7 +1441,7 @@
                     result = caster.bytesToInteger(((DataByteArray) obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "int.";
+                    String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1487,7 +1487,7 @@
                     result = caster.bytesToDouble(((DataByteArray) obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "double.";
+                    String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1533,7 +1533,7 @@
                     result = caster.bytesToLong(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "long.";
+                    String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1579,7 +1579,7 @@
                     result = caster.bytesToFloat(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "float.";
+                    String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1625,7 +1625,7 @@
                     result = caster.bytesToDateTime(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "datetime.";
+                    String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1664,7 +1664,7 @@
                     result = caster.bytesToCharArray(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "float.";
+                    String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1712,7 +1712,7 @@
                     result = caster.bytesToBigInteger(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "BigInteger.";
+                    String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1757,7 +1757,7 @@
                     result = caster.bytesToBigDecimal(((DataByteArray)obj).get());
                 } else {
                     int errCode = 1075;
-                    String msg = unknownByteArrayErrorMessage + "BigDecimal.";
+                    String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations();
                     throw new ExecException(msg, errCode, PigException.INPUT);
                 }
                 break;
@@ -1865,7 +1865,7 @@
                         res.result = caster.bytesToBag(dba.get(), fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "bag.";
+                        String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
@@ -1956,7 +1956,7 @@
                         res.result = caster.bytesToMap(dba.get(), fieldSchema);
                     } else {
                         int errCode = 1075;
-                        String msg = unknownByteArrayErrorMessage + "map.";
+                        String msg = unknownByteArrayErrorMessage + "map for " + this.getOriginalLocations();
                         throw new ExecException(msg, errCode, PigException.INPUT);
                     }
                 } catch (ExecException ee) {
diff --git a/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java b/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
index 873ba39..299c012 100644
--- a/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
+++ b/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
@@ -323,6 +323,7 @@
     public void visit( CastExpression op ) throws FrontendException {
         POCast pCast = new POCast(new OperatorKey(DEFAULT_SCOPE, nodeGen
                 .getNextNodeId(DEFAULT_SCOPE)));
+        pCast.addOriginalLocation(op.getFieldSchema().alias, op.getLocation()) ;
 //        physOp.setAlias(op.getAlias());
         currentPlan.add(pCast);
 
diff --git a/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java b/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
index 0b357ff..4f6957d 100644
--- a/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
+++ b/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
@@ -191,6 +191,15 @@
 
         new TypeCheckingRelVisitor( this, collector).visit();
 
+
+        new UnionOnSchemaSetter(this).visit();
+        new CastLineageSetter(this, collector).visit();
+        new ScalarVariableValidator(this).visit();
+        new StoreAliasSetter(this).visit();
+
+        // compute whether output data is sorted or not
+        new SortInfoSetter(this).visit();
+
         boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
 
         if(aggregateWarning) {
@@ -201,14 +210,6 @@
             }
         }
 
-        new UnionOnSchemaSetter(this).visit();
-        new CastLineageSetter(this, collector).visit();
-        new ScalarVariableValidator(this).visit();
-        new StoreAliasSetter(this).visit();
-
-        // compute whether output data is sorted or not
-        new SortInfoSetter(this).visit();
-
         if (!(skipInputOutputValidation || pigContext.inExplain || pigContext.inDumpSchema)) {
             // Validate input/output file
             new InputOutputFileValidatorVisitor(this, pigContext).visit();
diff --git a/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java b/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java
index a42eb40..1a7117e 100644
--- a/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java
+++ b/src/org/apache/pig/newplan/logical/visitor/CastLineageSetter.java
@@ -107,7 +107,7 @@
                 if(inLoadFunc == null){
                     String msg = "Cannot resolve load function to use for casting from " + 
                                 DataType.findTypeName(inType) + " to " +
-                                DataType.findTypeName(outType) + ". ";
+                                DataType.findTypeName(outType) + " at " + cast.getLocation() ;
                     msgCollector.collect(msg, MessageType.Warning,
                            PigWarning.NO_LOAD_FUNCTION_FOR_CASTING_BYTEARRAY);
                 }else {
diff --git a/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java b/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
index e9930df..db1ca48 100644
--- a/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
+++ b/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
@@ -458,6 +458,7 @@
         collectCastWarning(node, arg.getType(), toFs.type, msgCollector);
 
         CastExpression cast = new CastExpression(plan, arg, toFs);
+        cast.setLocation(node.getLocation());
         try {
             // disconnect cast and arg because the connection is already
             // added by cast constructor and insertBetween call is going
diff --git a/src/org/apache/pig/parser/SourceLocation.java b/src/org/apache/pig/parser/SourceLocation.java
index f8ebdaf..f957bb4 100644
--- a/src/org/apache/pig/parser/SourceLocation.java
+++ b/src/org/apache/pig/parser/SourceLocation.java
@@ -75,12 +75,11 @@
         if (node != null) {
             InvocationPoint pt = node.getNextInvocationPoint();
             while (pt != null) {
-                sb.append("\n");
                 sb.append("at expanding macro '" + pt.getMacro() + "' ("
                         + pt.getFile() + ":" + pt.getLine() + ")");
                 pt = node.getNextInvocationPoint();
+                sb.append("\n");
             }
-            sb.append("\n");
         }
         sb.append( "<" );
         if( file != null && !file.isEmpty() )
diff --git a/test/e2e/pig/tests/negative.conf b/test/e2e/pig/tests/negative.conf
index 8663f28..b7b5c87 100644
--- a/test/e2e/pig/tests/negative.conf
+++ b/test/e2e/pig/tests/negative.conf
@@ -579,10 +579,10 @@
 a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = filter a by name lt 'b';
 c = foreach b generate org.apache.pig.test.udf.evalfunc.CreateMap((chararray)name, age);
-d = foreach c generate $0#'alice young';
-split d into e if $0 < 42, f if $0 >= 42;
+d = foreach c generate $0#'alice young' as field_bytearray;
+split d into e if field_bytearray < 42, f if field_bytearray >= 42;
 store e into ':OUTPATH:';\,
-                'expected_err_regex' => "Received a bytearray from the UDF or Union from two different Loaders. Cannot determine how to convert the bytearray to int",
+                'expected_err_regex' => "Received a bytearray from the UDF or Union from two different Loaders. Cannot determine how to convert the bytearray to int for \\[field_bytearray\\[6,",
             },
         ]
         }
diff --git a/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java b/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java
index 905597e..f8dcb13 100644
--- a/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java
+++ b/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java
@@ -2912,12 +2912,12 @@
 
         @Test
         public void testUnionLineageDifferentSchemaFail() throws Throwable {
-            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b') as (field4, field5, field6: chararray, field7 );"
-            + "c = union a , b;"
+            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b') as (field4, field5, field6: chararray, field7 );\n"
+            + "c = union a , b;\n"
             + "d = foreach c generate $3 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         private void checkWarning(String query, String warnMsg) throws FrontendException {
@@ -2959,12 +2959,12 @@
         public void testUnionLineageMixSchemaFail() throws Throwable {
             // different loader caster associated with each input, so can't determine
             // which one to use on union output
-            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');"
-            + "c = union a , b;"
+            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');\n"
+            + "c = union a , b;\n"
             + "d = foreach c generate $3 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         @Test
@@ -3306,12 +3306,12 @@
 
         @Test
         public void testCrossLineageNoSchemaFail() throws Throwable {
-            String query = "a = load 'a' using PigStorage('a');"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');"
-            + "c = cross a , b;"
+            String query = "a = load 'a' using PigStorage('a');\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');\n"
+            + "c = cross a , b;\n"
             + "d = foreach c generate $1 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         @Test
@@ -3327,12 +3327,12 @@
 
         @Test
         public void testCrossLineageMixSchemaFail() throws Throwable {
-            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');"
-            + "c = cross a , b;"
+            String query = "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster('b');\n"
+            + "c = cross a , b;\n"
             + "d = foreach c generate $3 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         @Test
@@ -3361,12 +3361,12 @@
         public void testJoinLineageNoSchemaFail() throws Throwable {
             //this test case should change when we decide on what flattening a tuple or bag
             //with null schema results in a foreach flatten and hence a join
-            String query =  "a = load 'a' using PigStorage('a');"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster();"
-            + "c = join a by $0, b by $0;"
+            String query =  "a = load 'a' using PigStorage('a');\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster();\n"
+            + "c = join a by $0, b by $0;\n"
             + "d = foreach c generate $1 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         @Test
@@ -3382,12 +3382,12 @@
         public void testJoinLineageMixSchemaFail() throws Throwable {
             //this test case should change when we decide on what flattening a tuple or bag
             //with null schema results in a foreach flatten and hence a join
-            String query =  "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );"
-            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster();"
-            + "c = join a by field1, b by $0;"
+            String query =  "a = load 'a' using PigStorage('a') as (field1, field2: float, field3: chararray );\n"
+            + "b = load 'a' using org.apache.pig.test.PigStorageWithDifferentCaster();\n"
+            + "c = join a by field1, b by $0;\n"
             + "d = foreach c generate $3 + 2.0 ;";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         @Test
@@ -3871,12 +3871,12 @@
          */
         @Test
         public void testLineageMultipleLoader3() throws FrontendException {
-            String query =  "A = LOAD 'data1' USING PigStorage() AS (u, v, w);"
-            +  "B = LOAD 'data2' USING TextLoader() AS (x, y);"
-            + "C = COGROUP A BY u, B by x;"
-            +  "D = FOREACH C GENERATE (chararray)group;";
+            String query =  "A = LOAD 'data1' USING PigStorage() AS (u, v, w);\n"
+            +  "B = LOAD 'data2' USING TextLoader() AS (x, y);\n"
+            +  "C = COGROUP A BY u, B by x;\n"
+            +  "D = FOREACH C GENERATE (chararray)group;\n";
 
-            checkWarning(query, CAST_LOAD_NOT_FOUND);
+            checkWarning(query, CAST_LOAD_NOT_FOUND + " to double at <line 4,");
         }
 
         /**