diff --git a/CHANGES.txt b/CHANGES.txt
index fefdf6c..ecf0541 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -21,6 +21,8 @@
 Trunk (unreleased changes)
  
 INCOMPATIBLE CHANGES
+
+PIG-5067: Revisit union on numeric type and chararray to bytearray (knoguchi)
  
 IMPROVEMENTS
 
diff --git a/src/docs/src/documentation/content/xdocs/basic.xml b/src/docs/src/documentation/content/xdocs/basic.xml
index 050530b..f4066a6 100644
--- a/src/docs/src/documentation/content/xdocs/basic.xml
+++ b/src/docs/src/documentation/content/xdocs/basic.xml
@@ -713,7 +713,7 @@
    <p></p>
    <ul>
       <li>
-         <p>If Pig cannot resolve incompatible types through implicit casts, an error will occur. For example, you cannot add chararray and float (see the Types Table for addition and subtraction).</p>
+         <p>If Pig cannot resolve incompatible types through implicit casts, an error will occur. For example, you cannot add chararray and float (see the <a href="#types-table-add">Types Table for addition and subtraction</a>).</p>
       <source>
 A = LOAD 'data' AS (name:chararray, age:int, gpa:float);
 B = FOREACH A GENERATE name + gpa;
@@ -8503,11 +8503,11 @@
 A union B: null 
 </source>
   
-<p>Union columns with incompatible types result in a bytearray type: </p>
+<p>Union columns with incompatible types results in a failure. (See <a href="#types-table-add">Types Table for addition and subtraction</a> for incompatible types.)</p>
 <source>
-A: (a1:long, a2:long) 
-B: (b1:(b11:long, b12:long), b2:long) 
-A union B: (a1:bytearray, a2:long) 
+A: (a1:long)
+B: (a1:chararray)
+A union B: ERROR: Cannot cast from long to bytearray
 </source>
 
 <p>Union columns of compatible type will produce an "escalate" type. 
diff --git a/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java b/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
index db1ca48..81fbe28 100644
--- a/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
+++ b/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
@@ -491,7 +491,7 @@
         byte outType = cast.getType();
         if(outType == DataType.BYTEARRAY && inType != outType) {
             int errCode = 1051;
-            String msg = "Cannot cast to bytearray";
+            String msg = "Cannot cast from " + DataType.findTypeName(inType) + " to bytearray";
             msgCollector.collect(msg, MessageType.Error) ;
             throw new TypeCheckerException(cast, msg, errCode, PigException.INPUT) ;
         }
diff --git a/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java b/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
index d6c4cff..fd7ea74 100644
--- a/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
+++ b/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
@@ -351,7 +351,8 @@
 
             if (outFieldSchema.type != fs.type) {
                 castNeededCounter++ ;
-                new CastExpression(genPlan, project, outFieldSchema);
+                CastExpression castexp = new CastExpression(genPlan, project, outFieldSchema);
+                castexp.setLocation(toOp.getLocation());
             }
 
             generatePlans.add(genPlan) ;
diff --git a/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java b/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java
index 3d5ce68..fea4262 100644
--- a/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java
+++ b/src/org/apache/pig/newplan/logical/visitor/UnionOnSchemaSetter.java
@@ -21,6 +21,7 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.pig.PigException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.Pair;
@@ -110,9 +111,20 @@
                 } else {
                     ProjectExpression projExpr = 
                         new ProjectExpression( exprPlan, genInputs.size(), 0, gen );
-                    if( fs.type != DataType.BYTEARRAY
-                        && opSchema.getField( pos ).type != fs.type ) {
-                        new CastExpression( exprPlan, projExpr, fs );
+                    if( opSchema.getField( pos ).type != fs.type ) {
+                        if( fs.type != DataType.BYTEARRAY ) {
+                            CastExpression castexpr = new CastExpression( exprPlan, projExpr, fs );
+                            castexpr.setLocation(union.getLocation());
+                        } else {
+                            int errCode = 1056;
+                            String msg = "Union of incompatible types not allowed. "
+                                         + "Cannot cast from "
+                                         + DataType.findTypeName(opSchema.getField( pos ).type)
+                                         + " to bytearray for '"
+                                         + opSchema.getField( pos ).alias
+                                         + "'. Please typecast to compatible types before union." ;
+                            throw new FrontendException(union, msg, errCode, PigException.INPUT) ;
+                        }
                     }
                     genInputs.add( new LOInnerLoad( innerPlan, foreach, pos ) );
                 }
diff --git a/test/e2e/pig/tests/nightly.conf b/test/e2e/pig/tests/nightly.conf
index 2b0a50a..da7528d 100644
--- a/test/e2e/pig/tests/nightly.conf
+++ b/test/e2e/pig/tests/nightly.conf
@@ -4872,21 +4872,6 @@
 b = load ':INPATH:/singlefile/allscalar10k' using PigStorage() as (name:chararray, age:int, gpa:double, instate:chararray);
 C = union a, b;
 store C into ':OUTPATH:';\, 
-                },
-                {
-                    # Test Union using merge with incompatible types.  float->bytearray and chararray->bytearray
-                    'num' => 8,
-                    'delimiter' => '	',
-                    'pig' => q\
-A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int);
-B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:chararray);
-C = union onschema A, B;
-store C into ':OUTPATH:';\,
-                    'verify_pig_script' => q\
-A = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:bytearray);
-B = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:bytearray);
-C = union A, B;
-store C into ':OUTPATH:';\,
                 }
               ]
 
diff --git a/test/org/apache/pig/test/TestUnionOnSchema.java b/test/org/apache/pig/test/TestUnionOnSchema.java
index 170ef78..1da3885 100644
--- a/test/org/apache/pig/test/TestUnionOnSchema.java
+++ b/test/org/apache/pig/test/TestUnionOnSchema.java
@@ -96,8 +96,6 @@
 
     /**
      * Test UNION ONSCHEMA on two inputs with same schema
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaSameSchema() throws Exception {
@@ -128,8 +126,6 @@
     
     /**
      * Test UNION ONSCHEMA with operations after the union
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaFilter() throws Exception {
@@ -161,8 +157,6 @@
     
     /**
      * Test UNION ONSCHEMA with operations after the union
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaSuccOps() throws Exception {
@@ -194,8 +188,6 @@
     
     /**
      * Test UNION ONSCHEMA with cast from bytearray to another type
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaCastOnByteArray() throws Exception {
@@ -223,8 +215,6 @@
     /**
      * Test UNION ONSCHEMA where a common column has additional 'namespace' part
      *  in the column name in one of the inputs
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaScopedColumnName() throws Exception {
@@ -266,8 +256,6 @@
     /**
      * Test UNION ONSCHEMA where a common column has additional 'namespace' part
      *  in the column name in both the inputs
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaScopedColumnNameBothInp1() throws Exception {
@@ -302,8 +290,6 @@
     /**
      * Test UNION ONSCHEMA where a common column has additional 'namespace' part
      *  in the column name in both the inputs
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaScopedColumnNameBothInp2() throws Exception {
@@ -340,8 +326,6 @@
      * Test UNION ONSCHEMA where a common column has additional 'namespace' part
      *  in the column name in one of the inputs.
      *  Negative test case
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaScopedColumnNameNeg() throws Exception {
@@ -366,8 +350,6 @@
     /**
      * Test UNION ONSCHEMA on two inputs with same column names, but different
      * numeric types - test type promotion
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaDiffNumType() throws Exception {
@@ -396,8 +378,6 @@
 
     /**
      * Test UNION ONSCHEMA on two inputs with no common columns
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaNoCommonCols() throws Exception {
@@ -424,8 +404,6 @@
     
     /**
      * Test UNION ONSCHEMA on two inputs , one input with additional columns
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaAdditionalColumn() throws Exception {
@@ -498,8 +476,6 @@
     
     /**
      * Test UNION ONSCHEMA on 3 inputs 
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchema3Inputs() throws Exception {
@@ -533,8 +509,6 @@
 
     /**
      * Test UNION ONSCHEMA with bytearray type 
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaByteArrayConversions() throws Exception {
@@ -572,8 +546,6 @@
     
     /**
      * negative test - test error on no schema
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaNoSchema() throws Exception {
@@ -597,8 +569,6 @@
     
     /**
      * negative test - test error on null alias in one of the FieldSchema
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaNullAliasInFieldSchema() throws Exception {
@@ -640,8 +610,6 @@
 
     /**
      * test union with incompatible types in schema
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaIncompatibleTypes() throws Exception {
@@ -650,7 +618,15 @@
             + "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
             + "u = union onschema l1, l2;";
 
-        checkSchemaEquals(query, "x : long, y : bytearray");
+        checkSchemaEx(query, "Cannot cast from chararray to bytearray");
+
+        //without "onschema"
+        query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : chararray);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (x : long, y : float);"
+            + "u = union l1, l2;";
+
+        checkSchemaEx(query, "Cannot cast from chararray to bytearray");
 
 
         
@@ -659,8 +635,15 @@
             + "l2 = load '" + INP_FILE_2NUMS + "' as (x : map[ ], y : chararray);"
             + "u = union onschema l1, l2;"
         ; 
-        checkSchemaEquals(query, "x : bytearray, y : chararray");
+        checkSchemaEx(query, "Cannot cast from long to bytearray");
                
+        query =
+            "  l1 = load '" + INP_FILE_2NUMS + "' as (x : long, y : chararray);"
+            + "l2 = load '" + INP_FILE_2NUMS + "' as (x : map[ ], y : chararray);"
+            + "u = union l1, l2;"
+        ;
+        checkSchemaEx(query, "Cannot cast from long to bytearray");
+
         // bag column with different internal column types
         query =
             "  l1 = load '" + INP_FILE_2NUMS 
@@ -708,8 +691,6 @@
 
     /**
      * Test UNION ONSCHEMA with input relation having udfs
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaInputUdfs() throws Exception {
@@ -745,8 +726,6 @@
     /**
      * Test UNION ONSCHEMA with udf whose default type is different from
      * final type
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaUdfTypeEvolution() throws Exception {
@@ -797,8 +776,6 @@
     /**
      * Test UNION ONSCHEMA with udf whose default type is different from
      * final type - where udf is not in immediate input of union
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaUdfTypeEvolution2() throws Exception {
@@ -869,8 +846,6 @@
     /**
      * Test UNION ONSCHEMA with input relation having column names with multiple
      * level of namespace in their names
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testUnionOnSchemaScopeMulti() throws Exception {
@@ -916,8 +891,6 @@
     
     /**
      * Test query with a union-onschema having another as input 
-     * @throws IOException
-     * @throws ParserException
      */
     @Test
     public void testTwoUnions() throws Exception {
