PIG-5400: OrcStorage dropping struct(tuple) when it only holds a single field inside a Bag(array) (knoguchI)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1875366 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 64e4c45..d2d2442 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-5400: OrcStorage dropping struct(tuple) when it only holds a single field inside a Bag(array) (knoguchI)
+
 PIG-4764: Make Pig work with Hive 3.1 (szita)
 
 PIG-5352: Please add OWASP Dependency Check to the build ivy.xml (knoguchi)
diff --git a/src/docs/src/documentation/content/xdocs/func.xml b/src/docs/src/documentation/content/xdocs/func.xml
index 5662eb8..7699536 100644
--- a/src/docs/src/documentation/content/xdocs/func.xml
+++ b/src/docs/src/documentation/content/xdocs/func.xml
@@ -2672,6 +2672,7 @@
                <li>--bufferSize or -b Set the size of the memory buffers used for compressing and storing the stripe in memory. Default is 262144 (256K).</li>
                <li>--blockPadding or -p Sets whether the HDFS blocks are padded to prevent stripes from straddling blocks. Default is true.</li>
                <li>--compress or -c Sets the generic compression that is used to compress the data. Valid codecs are: NONE, ZLIB, SNAPPY, LZO. Default is ZLIB.</li>
+               <li>--keepSingleFieldTuple or -k Sets whether to keep a Tuple(struct) schema inside a Bag(array) even if the tuple only contains a single field. Default is false.</li>
                <li>--version or -v Sets the version of the file that will be written</li>
                </ul>
             </td>
diff --git a/src/org/apache/pig/builtin/OrcStorage.java b/src/org/apache/pig/builtin/OrcStorage.java
index 9e4de7f..9db01f4 100644
--- a/src/org/apache/pig/builtin/OrcStorage.java
+++ b/src/org/apache/pig/builtin/OrcStorage.java
@@ -114,6 +114,8 @@
  * from straddling blocks
  * <li><code>-c, --compress</code> Sets the generic compression that is used to compress the data.
  * Valid codecs are: NONE, ZLIB, SNAPPY, LZO
+ * <li><code>-k, --keepSingleFieldTuple</code> Sets whether to keep a Tuple(struct) schema
+ * inside a Bag(array) even if the tuple only contains a single field
  * <li><code>-v, --version</code> Sets the version of the file that will be written
  * </ul>
  **/
@@ -133,6 +135,7 @@
     private Integer rowIndexStride;
     private Integer bufferSize;
     private Boolean blockPadding;
+    private Boolean keepSingleFieldTuple = false;
     private CompressionKind compress;
     private String versionName;
 
@@ -158,6 +161,9 @@
                 "are padded to prevent stripes from straddling blocks");
         validOptions.addOption("c", "compress", true,
                 "Sets the generic compression that is used to compress the data");
+        validOptions.addOption("k", "keepSingleFieldTuple", false,
+                "Sets whether to keep Tuple(struct) schema inside a Bag(array) even if " +
+                "the tuple only contains a single field");
         validOptions.addOption("v", "version", true,
                 "Sets the version of the file that will be written");
     }
@@ -185,6 +191,7 @@
             if (configuredOptions.hasOption('v')) {
                 versionName = HiveShims.normalizeOrcVersionName(configuredOptions.getOptionValue('v'));
             }
+            keepSingleFieldTuple = configuredOptions.hasOption('k');
         } catch (ParseException e) {
             log.error("Exception in OrcStorage", e);
             log.error("OrcStorage called with arguments " + options);
@@ -217,7 +224,7 @@
             typeInfo = (TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix));
         }
         if (oi==null) {
-            oi = HiveUtils.createObjectInspector(typeInfo);
+            oi = HiveUtils.createObjectInspector(typeInfo, keepSingleFieldTuple);
         }
     }
 
@@ -226,7 +233,7 @@
         ResourceFieldSchema fs = new ResourceFieldSchema();
         fs.setType(DataType.TUPLE);
         fs.setSchema(rs);
-        typeInfo = HiveUtils.getTypeInfo(fs);
+        typeInfo = HiveUtils.getTypeInfo(fs, keepSingleFieldTuple);
         Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
         p.setProperty(signature + SchemaSignatureSuffix, ObjectSerializer.serialize(typeInfo));
     }
diff --git a/src/org/apache/pig/impl/util/hive/HiveUtils.java b/src/org/apache/pig/impl/util/hive/HiveUtils.java
index 7ff75e4..5fc0f40 100644
--- a/src/org/apache/pig/impl/util/hive/HiveUtils.java
+++ b/src/org/apache/pig/impl/util/hive/HiveUtils.java
@@ -227,10 +227,12 @@
                 innerFs[0] = itemSchema;
             } else {
                 // If item is not tuple, wrap it into tuple
+                // Hive allows arrays(Bag) of primitive types but Pig does not.
                 ResourceFieldSchema tupleFieldSchema = new ResourceFieldSchema();
                 tupleFieldSchema.setType(DataType.TUPLE);
                 ResourceSchema tupleSchema = new ResourceSchema();
                 tupleSchema.setFields(new ResourceFieldSchema[] {itemSchema});
+                tupleFieldSchema.setSchema(tupleSchema);
                 innerFs[0] = tupleFieldSchema;
             }
 
@@ -302,6 +304,10 @@
     }
 
     public static TypeInfo getTypeInfo(ResourceFieldSchema fs) throws IOException {
+      return getTypeInfo(fs, false);
+    }
+
+    public static TypeInfo getTypeInfo(ResourceFieldSchema fs, boolean keepSingleFieldTuple) throws IOException {
         TypeInfo ti;
         switch (fs.getType()) {
         case DataType.TUPLE:
@@ -309,7 +315,7 @@
             ArrayList<String> names = new ArrayList<String>();
             ArrayList<TypeInfo> typeInfos = new ArrayList<TypeInfo>();
             for (ResourceFieldSchema subFs : fs.getSchema().getFields()) {
-                TypeInfo info = getTypeInfo(subFs);
+                TypeInfo info = getTypeInfo(subFs, keepSingleFieldTuple);
                 names.add(subFs.getName());
                 typeInfos.add(info);
             }
@@ -324,10 +330,10 @@
             ResourceFieldSchema tupleSchema = fs.getSchema().getFields()[0];
             ResourceFieldSchema itemSchema = tupleSchema;
             // If single item tuple, remove the tuple, put the inner item into list directly
-            if (tupleSchema.getSchema().getFields().length == 1) {
+            if (!keepSingleFieldTuple && tupleSchema.getSchema().getFields().length == 1) {
                 itemSchema = tupleSchema.getSchema().getFields()[0];
             }
-            TypeInfo elementField = getTypeInfo(itemSchema);
+            TypeInfo elementField = getTypeInfo(itemSchema, keepSingleFieldTuple);
             ((ListTypeInfo)ti).setListElementTypeInfo(elementField);
             break;
         case DataType.MAP:
@@ -336,7 +342,7 @@
             if (fs.getSchema() == null || fs.getSchema().getFields().length != 1) {
                 valueField = TypeInfoFactory.binaryTypeInfo;
             } else {
-                valueField = getTypeInfo(fs.getSchema().getFields()[0]);
+                valueField = getTypeInfo(fs.getSchema().getFields()[0], keepSingleFieldTuple);
             }
             ((MapTypeInfo)ti).setMapKeyTypeInfo(TypeInfoFactory.stringTypeInfo);
             ((MapTypeInfo)ti).setMapValueTypeInfo(valueField);
@@ -414,12 +420,16 @@
         private List<StructField> fields;
 
         PigStructInspector(StructTypeInfo info) {
+          this(info, false);
+        }
+
+        PigStructInspector(StructTypeInfo info, boolean keepSingleFieldTuple) {
             ArrayList<String> fieldNames = info.getAllStructFieldNames();
             ArrayList<TypeInfo> fieldTypes = info.getAllStructFieldTypeInfos();
             fields = new ArrayList<StructField>(fieldNames.size());
             for (int i = 0; i < fieldNames.size(); ++i) {
                 fields.add(new Field(fieldNames.get(i),
-                        createObjectInspector(fieldTypes.get(i)), i));
+                        createObjectInspector(fieldTypes.get(i), keepSingleFieldTuple), i));
             }
         }
 
@@ -510,8 +520,12 @@
         private ObjectInspector value;
 
         PigMapObjectInspector(MapTypeInfo info) {
+          this(info, false);
+        }
+
+        PigMapObjectInspector(MapTypeInfo info, boolean keepSingleFieldTuple) {
             key = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
-            value = createObjectInspector(info.getMapValueTypeInfo());
+            value = createObjectInspector(info.getMapValueTypeInfo(), keepSingleFieldTuple);
         }
 
         @Override
@@ -567,9 +581,15 @@
         private Object cachedObject;
         private int index;
         private Iterator<Tuple> iter;
+        private boolean keepSingleFieldTuple;
 
         PigListObjectInspector(ListTypeInfo info) {
-            child = createObjectInspector(info.getListElementTypeInfo());
+            this(info, false);
+        }
+
+        PigListObjectInspector(ListTypeInfo info, boolean keepSingleFieldTuple) {
+            this.keepSingleFieldTuple = keepSingleFieldTuple;
+            child = createObjectInspector(info.getListElementTypeInfo(), keepSingleFieldTuple);
         }
 
         @Override
@@ -590,7 +610,7 @@
                 try {
                     Tuple t = iter.next();
                     // If single item tuple, take the item directly from list
-                    if (t.size() == 1) {
+                    if (!keepSingleFieldTuple && t.size() == 1) {
                         return t.get(0);
                     } else {
                         return t;
@@ -702,6 +722,10 @@
     }
 
     public static ObjectInspector createObjectInspector(TypeInfo info) {
+      return createObjectInspector(info, false);
+    }
+
+    public static ObjectInspector createObjectInspector(TypeInfo info, boolean keepSingleFieldTuple) {
         switch (info.getCategory()) {
         case PRIMITIVE:
           switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) {
@@ -735,11 +759,11 @@
                         ((PrimitiveTypeInfo) info).getPrimitiveCategory());
           }
         case STRUCT:
-          return new PigStructInspector((StructTypeInfo) info);
+          return new PigStructInspector((StructTypeInfo) info, keepSingleFieldTuple);
         case MAP:
-          return new PigMapObjectInspector((MapTypeInfo) info);
+          return new PigMapObjectInspector((MapTypeInfo) info, keepSingleFieldTuple);
         case LIST:
-          return new PigListObjectInspector((ListTypeInfo) info);
+          return new PigListObjectInspector((ListTypeInfo) info, keepSingleFieldTuple);
         default:
           throw new IllegalArgumentException("Unknown type " +
             info.getCategory());
diff --git a/test/org/apache/pig/builtin/TestOrcStorage.java b/test/org/apache/pig/builtin/TestOrcStorage.java
index 6e8b4dc..b8f6bd5 100644
--- a/test/org/apache/pig/builtin/TestOrcStorage.java
+++ b/test/org/apache/pig/builtin/TestOrcStorage.java
@@ -68,6 +68,7 @@
 
 import static org.apache.pig.builtin.mock.Storage.resetData;
 import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.apache.pig.builtin.mock.Storage.bag;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -311,6 +312,60 @@
         assertFalse(iter.hasNext());
     }
 
+    @Test
+    // See PIG-____
+    public void testSingleItemTuple() throws Exception {
+        Data data = resetData(pigServer);
+        data.set("foo", "a:bag{t:tuple(number:int)}",
+            tuple(bag(tuple(1), tuple(2), tuple(3))),
+            tuple(bag(tuple(4), tuple(5), tuple(6))),
+            tuple(bag(tuple(7), tuple(8), tuple(9)))
+        );
+
+        // Testing writes
+
+        //A: (a:bag{t:tuple(number:int)})
+        pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();");
+        pigServer.registerQuery("store A into '" + OUTPUT1 + "' using OrcStorage();");
+        pigServer.registerQuery("store A into '" + OUTPUT2 + "' using OrcStorage('-k');");
+
+        Reader reader1 = OrcFile.createReader(fs, Util.getFirstPartFile(new Path(OUTPUT1)));
+        //Note, "number" alias is dropped but this is an issue on Hive/OrcFile side
+        assertEquals("struct<a:array<int>>", reader1.getObjectInspector().getTypeName());
+
+        Reader reader2 = OrcFile.createReader(fs, Util.getFirstPartFile(new Path(OUTPUT2)));
+        assertEquals("struct<a:array<struct<number:int>>>", reader2.getObjectInspector().getTypeName());
+
+        assertEquals(reader1.getNumberOfRows(), 3);
+        assertEquals(reader2.getNumberOfRows(), 3);
+
+        // For read, option '-k' is ignored.  It all maps back to single tuple
+        // since Pig doesn't support Bag with primitive types.
+        pigServer.registerQuery("B = load '" + OUTPUT1 + "' using OrcStorage();");
+        pigServer.registerQuery("C = load '" + OUTPUT2 + "' using OrcStorage();");
+
+        Schema schema1 = pigServer.dumpSchema("B"); // struct<a:array<int>> --> a:{(int)}
+        Schema schema2 = pigServer.dumpSchema("C"); // struct<a:array<struct<number:int>>> --> a:{(number:int)}
+
+        // Currently Hive's OrcFile doesn't seem to store the name of the fields
+        // except for Tuples. Thus, only checking the types but not aliases.
+        // equals(schema, other, relaxInner=false, relaxAlias=true)
+        assertTrue(Schema.equals(schema1, schema2, false, true));
+
+        System.err.println(schema1);
+        System.err.println(schema2);
+        Iterator<Tuple> iter1 = pigServer.openIterator("B");
+        Iterator<Tuple> iter2 = pigServer.openIterator("C");
+        int count=0;
+        while (iter1.hasNext()) {
+          Tuple t1 = iter1.next();
+          Tuple t2 = iter2.next();
+          assertEquals(t1, t2);
+          count++;
+        }
+        assertEquals(3, count);
+    }
+
     private void verifyData(Path orcFile, Iterator<Tuple> iter, FileSystem fs, int expectedTotalRows) throws Exception {
 
         int expectedRows = 0;