PIG-4734: TOMAP schema inferring breaks some scripts in type checking for bincond

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1746334 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 8ae2858..c262302 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -141,6 +141,8 @@
 
 BUG FIXES
 
+PIG-4734: TOMAP schema inferring breaks some scripts in type checking for bincond (daijy)
+
 PIG-4786: CROSS will not work correctly with Grace Parallelism (daijy)
 
 PIG-3227: SearchEngineExtractor does not work for bing (dannyant via daijy)
diff --git a/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java b/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
index 2345fdb..5d755b3 100644
--- a/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
+++ b/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
@@ -447,7 +447,23 @@
             LogicalFieldSchema mergedFS = new LogicalFieldSchema(mergedAlias, mergedSubSchema, mergedType);
             return mergedFS;
         }
-        
+
+        public static boolean isEqualUnlessUnknown(LogicalFieldSchema fs1, LogicalFieldSchema fs2) throws FrontendException {
+            if (fs1.type == DataType.BYTEARRAY) {
+                return true;
+            } else if (fs2.type == DataType.BYTEARRAY) {
+                return true;
+            } else if (fs1.type == fs2.type) {
+                if (DataType.isComplex(fs1.type)) {
+                    return LogicalSchema.isEqualUnlessUnknown(fs1.schema, fs2.schema);
+                } else {
+                    return true;
+                }
+            } else {
+                return false;
+            }
+        }
+
         /***
          * Old Pig field schema does not require a tuple schema inside a bag;
          * Now it is required to have that; this method is to fill the gap
@@ -770,7 +786,24 @@
         }
         return mergedSchema;
     }
-    
+
+    public static boolean isEqualUnlessUnknown(LogicalSchema s1, LogicalSchema s2) throws FrontendException {
+        if (s1 == null) {
+            return true;
+        } else if (s2 == null) {
+            return true;
+        } else if (s1.size() != s2.size()) {
+            return false;
+        } else {
+            for (int i=0;i<s1.size();i++) {
+                if (!LogicalFieldSchema.isEqualUnlessUnknown(s1.getField(i), s1.getField(i))) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
     public String toString(boolean verbose) {
         StringBuilder str = new StringBuilder();
         
diff --git a/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java b/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
index ae49892..e9930df 100644
--- a/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
+++ b/src/org/apache/pig/newplan/logical/visitor/TypeCheckingExpVisitor.java
@@ -607,7 +607,7 @@
             // Matching schemas if we're working with tuples/bags
             if (DataType.isSchemaType(lhsType)) {
                 try {
-                    if(! binCond.getLhs().getFieldSchema().isEqual(binCond.getRhs().getFieldSchema())){
+                    if(!LogicalFieldSchema.isEqualUnlessUnknown(binCond.getLhs().getFieldSchema(), binCond.getRhs().getFieldSchema())){
                         int errCode = 1048;
                         String msg = "Two inputs of BinCond must have compatible schemas."
                             + " left hand side: " + binCond.getLhs().getFieldSchema()
diff --git a/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java b/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java
index eef3523..905597e 100644
--- a/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java
+++ b/test/org/apache/pig/test/TestTypeCheckingValidatorNewLP.java
@@ -39,8 +39,10 @@
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import junit.framework.Assert;
@@ -55,6 +57,8 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.NonSpillableDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.PigContext;
@@ -4118,4 +4122,56 @@
                 " corresponding column in earlier relation(s) in the statement";
             Util.checkExceptionMessage(query, "c", msg);
         }
+        //see PIG-4734
+        public static class GenericToMap extends EvalFunc<Map<String, Double>> {
+            @Override
+            public Map exec(Tuple input) throws IOException {
+                Map<String, Double> output = new HashMap<String, Double>();
+                output.put((String)input.get(0), (Double)input.get(1));
+                return output;
+            }
+        }
+        @Test
+        public void testBinCondCompatMap() throws Exception {
+            String query =
+                "a = load 'studenttab10k' as (name:chararray, gpa:double);"
+                + "b = foreach a generate gpa, TOMAP(name, gpa) as m1, "
+                + GenericToMap.class.getName() + "(name, gpa) as m2;"
+                + "c = foreach b generate (gpa>3? m1 : m2);";
+                createAndProcessLPlan(query);
+        }
+        public static class GenericToTuple extends EvalFunc<Tuple> {
+            @Override
+            public Tuple exec(Tuple input) throws IOException {
+                return input;
+            }
+        }
+        @Test
+        public void testBinCondCompatTuple() throws Exception {
+            String query =
+                "a = load 'studenttab10k' as (name:chararray, gpa:double);"
+                + "b = foreach a generate gpa, TOTUPLE(name, gpa) as t1, "
+                + GenericToTuple.class.getName() + "(name, gpa) as t2;"
+                + "c = foreach b generate (gpa>3? t1 : t2);";
+                createAndProcessLPlan(query);
+        }
+        public static class GenericToBag extends EvalFunc<DataBag> {
+            @Override
+            public DataBag exec(Tuple input) throws IOException {
+                DataBag bag = new NonSpillableDataBag(1);
+                Tuple t = new DefaultTuple();
+                t.append(input.get(0));
+                bag.add(t);
+                return bag;
+            }
+        }
+        @Test
+        public void testBinCondCompatBag() throws Exception {
+            String query =
+                "a = load 'studenttab10k' as (name:chararray, gpa:double);"
+                + "b = foreach a generate gpa, TOBAG(name) as b1, "
+                + GenericToBag.class.getName() + "(name) as b2;"
+                + "c = foreach b generate (gpa>3? b1 : b2);";
+                createAndProcessLPlan(query);
+        }
 }