PIG-5400: OrcStorage dropping struct(tuple) when it only holds a single field inside a Bag(array) (knoguchI)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1875366 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 64e4c45..d2d2442 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
IMPROVEMENTS
+PIG-5400: OrcStorage dropping struct(tuple) when it only holds a single field inside a Bag(array) (knoguchI)
+
PIG-4764: Make Pig work with Hive 3.1 (szita)
PIG-5352: Please add OWASP Dependency Check to the build ivy.xml (knoguchi)
diff --git a/src/docs/src/documentation/content/xdocs/func.xml b/src/docs/src/documentation/content/xdocs/func.xml
index 5662eb8..7699536 100644
--- a/src/docs/src/documentation/content/xdocs/func.xml
+++ b/src/docs/src/documentation/content/xdocs/func.xml
@@ -2672,6 +2672,7 @@
<li>--bufferSize or -b Set the size of the memory buffers used for compressing and storing the stripe in memory. Default is 262144 (256K).</li>
<li>--blockPadding or -p Sets whether the HDFS blocks are padded to prevent stripes from straddling blocks. Default is true.</li>
<li>--compress or -c Sets the generic compression that is used to compress the data. Valid codecs are: NONE, ZLIB, SNAPPY, LZO. Default is ZLIB.</li>
+ <li>--keepSingleFieldTuple or -k Sets whether to keep a Tuple(struct) schema inside a Bag(array) even if the tuple only contains a single field. Default is false.</li>
<li>--version or -v Sets the version of the file that will be written</li>
</ul>
</td>
diff --git a/src/org/apache/pig/builtin/OrcStorage.java b/src/org/apache/pig/builtin/OrcStorage.java
index 9e4de7f..9db01f4 100644
--- a/src/org/apache/pig/builtin/OrcStorage.java
+++ b/src/org/apache/pig/builtin/OrcStorage.java
@@ -114,6 +114,8 @@
* from straddling blocks
* <li><code>-c, --compress</code> Sets the generic compression that is used to compress the data.
* Valid codecs are: NONE, ZLIB, SNAPPY, LZO
+ * <li><code>-k, --keepSingleFieldTuple</code> Sets whether to keep a Tuple(struct) schema
+ * inside a Bag(array) even if the tuple only contains a single field
* <li><code>-v, --version</code> Sets the version of the file that will be written
* </ul>
**/
@@ -133,6 +135,7 @@
private Integer rowIndexStride;
private Integer bufferSize;
private Boolean blockPadding;
+ private Boolean keepSingleFieldTuple = false;
private CompressionKind compress;
private String versionName;
@@ -158,6 +161,9 @@
"are padded to prevent stripes from straddling blocks");
validOptions.addOption("c", "compress", true,
"Sets the generic compression that is used to compress the data");
+ validOptions.addOption("k", "keepSingleFieldTuple", false,
+ "Sets whether to keep Tuple(struct) schema inside a Bag(array) even if " +
+ "the tuple only contains a single field");
validOptions.addOption("v", "version", true,
"Sets the version of the file that will be written");
}
@@ -185,6 +191,7 @@
if (configuredOptions.hasOption('v')) {
versionName = HiveShims.normalizeOrcVersionName(configuredOptions.getOptionValue('v'));
}
+ keepSingleFieldTuple = configuredOptions.hasOption('k');
} catch (ParseException e) {
log.error("Exception in OrcStorage", e);
log.error("OrcStorage called with arguments " + options);
@@ -217,7 +224,7 @@
typeInfo = (TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix));
}
if (oi==null) {
- oi = HiveUtils.createObjectInspector(typeInfo);
+ oi = HiveUtils.createObjectInspector(typeInfo, keepSingleFieldTuple);
}
}
@@ -226,7 +233,7 @@
ResourceFieldSchema fs = new ResourceFieldSchema();
fs.setType(DataType.TUPLE);
fs.setSchema(rs);
- typeInfo = HiveUtils.getTypeInfo(fs);
+ typeInfo = HiveUtils.getTypeInfo(fs, keepSingleFieldTuple);
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
p.setProperty(signature + SchemaSignatureSuffix, ObjectSerializer.serialize(typeInfo));
}
diff --git a/src/org/apache/pig/impl/util/hive/HiveUtils.java b/src/org/apache/pig/impl/util/hive/HiveUtils.java
index 7ff75e4..5fc0f40 100644
--- a/src/org/apache/pig/impl/util/hive/HiveUtils.java
+++ b/src/org/apache/pig/impl/util/hive/HiveUtils.java
@@ -227,10 +227,12 @@
innerFs[0] = itemSchema;
} else {
// If item is not tuple, wrap it into tuple
+ // Hive allows arrays(Bag) of primitive types but Pig does not.
ResourceFieldSchema tupleFieldSchema = new ResourceFieldSchema();
tupleFieldSchema.setType(DataType.TUPLE);
ResourceSchema tupleSchema = new ResourceSchema();
tupleSchema.setFields(new ResourceFieldSchema[] {itemSchema});
+ tupleFieldSchema.setSchema(tupleSchema);
innerFs[0] = tupleFieldSchema;
}
@@ -302,6 +304,10 @@
}
public static TypeInfo getTypeInfo(ResourceFieldSchema fs) throws IOException {
+ return getTypeInfo(fs, false);
+ }
+
+ public static TypeInfo getTypeInfo(ResourceFieldSchema fs, boolean keepSingleFieldTuple) throws IOException {
TypeInfo ti;
switch (fs.getType()) {
case DataType.TUPLE:
@@ -309,7 +315,7 @@
ArrayList<String> names = new ArrayList<String>();
ArrayList<TypeInfo> typeInfos = new ArrayList<TypeInfo>();
for (ResourceFieldSchema subFs : fs.getSchema().getFields()) {
- TypeInfo info = getTypeInfo(subFs);
+ TypeInfo info = getTypeInfo(subFs, keepSingleFieldTuple);
names.add(subFs.getName());
typeInfos.add(info);
}
@@ -324,10 +330,10 @@
ResourceFieldSchema tupleSchema = fs.getSchema().getFields()[0];
ResourceFieldSchema itemSchema = tupleSchema;
// If single item tuple, remove the tuple, put the inner item into list directly
- if (tupleSchema.getSchema().getFields().length == 1) {
+ if (!keepSingleFieldTuple && tupleSchema.getSchema().getFields().length == 1) {
itemSchema = tupleSchema.getSchema().getFields()[0];
}
- TypeInfo elementField = getTypeInfo(itemSchema);
+ TypeInfo elementField = getTypeInfo(itemSchema, keepSingleFieldTuple);
((ListTypeInfo)ti).setListElementTypeInfo(elementField);
break;
case DataType.MAP:
@@ -336,7 +342,7 @@
if (fs.getSchema() == null || fs.getSchema().getFields().length != 1) {
valueField = TypeInfoFactory.binaryTypeInfo;
} else {
- valueField = getTypeInfo(fs.getSchema().getFields()[0]);
+ valueField = getTypeInfo(fs.getSchema().getFields()[0], keepSingleFieldTuple);
}
((MapTypeInfo)ti).setMapKeyTypeInfo(TypeInfoFactory.stringTypeInfo);
((MapTypeInfo)ti).setMapValueTypeInfo(valueField);
@@ -414,12 +420,16 @@
private List<StructField> fields;
PigStructInspector(StructTypeInfo info) {
+ this(info, false);
+ }
+
+ PigStructInspector(StructTypeInfo info, boolean keepSingleFieldTuple) {
ArrayList<String> fieldNames = info.getAllStructFieldNames();
ArrayList<TypeInfo> fieldTypes = info.getAllStructFieldTypeInfos();
fields = new ArrayList<StructField>(fieldNames.size());
for (int i = 0; i < fieldNames.size(); ++i) {
fields.add(new Field(fieldNames.get(i),
- createObjectInspector(fieldTypes.get(i)), i));
+ createObjectInspector(fieldTypes.get(i), keepSingleFieldTuple), i));
}
}
@@ -510,8 +520,12 @@
private ObjectInspector value;
PigMapObjectInspector(MapTypeInfo info) {
+ this(info, false);
+ }
+
+ PigMapObjectInspector(MapTypeInfo info, boolean keepSingleFieldTuple) {
key = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
- value = createObjectInspector(info.getMapValueTypeInfo());
+ value = createObjectInspector(info.getMapValueTypeInfo(), keepSingleFieldTuple);
}
@Override
@@ -567,9 +581,15 @@
private Object cachedObject;
private int index;
private Iterator<Tuple> iter;
+ private boolean keepSingleFieldTuple;
PigListObjectInspector(ListTypeInfo info) {
- child = createObjectInspector(info.getListElementTypeInfo());
+ this(info, false);
+ }
+
+ PigListObjectInspector(ListTypeInfo info, boolean keepSingleFieldTuple) {
+ this.keepSingleFieldTuple = keepSingleFieldTuple;
+ child = createObjectInspector(info.getListElementTypeInfo(), keepSingleFieldTuple);
}
@Override
@@ -590,7 +610,7 @@
try {
Tuple t = iter.next();
// If single item tuple, take the item directly from list
- if (t.size() == 1) {
+ if (!keepSingleFieldTuple && t.size() == 1) {
return t.get(0);
} else {
return t;
@@ -702,6 +722,10 @@
}
public static ObjectInspector createObjectInspector(TypeInfo info) {
+ return createObjectInspector(info, false);
+ }
+
+ public static ObjectInspector createObjectInspector(TypeInfo info, boolean keepSingleFieldTuple) {
switch (info.getCategory()) {
case PRIMITIVE:
switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) {
@@ -735,11 +759,11 @@
((PrimitiveTypeInfo) info).getPrimitiveCategory());
}
case STRUCT:
- return new PigStructInspector((StructTypeInfo) info);
+ return new PigStructInspector((StructTypeInfo) info, keepSingleFieldTuple);
case MAP:
- return new PigMapObjectInspector((MapTypeInfo) info);
+ return new PigMapObjectInspector((MapTypeInfo) info, keepSingleFieldTuple);
case LIST:
- return new PigListObjectInspector((ListTypeInfo) info);
+ return new PigListObjectInspector((ListTypeInfo) info, keepSingleFieldTuple);
default:
throw new IllegalArgumentException("Unknown type " +
info.getCategory());
diff --git a/test/org/apache/pig/builtin/TestOrcStorage.java b/test/org/apache/pig/builtin/TestOrcStorage.java
index 6e8b4dc..b8f6bd5 100644
--- a/test/org/apache/pig/builtin/TestOrcStorage.java
+++ b/test/org/apache/pig/builtin/TestOrcStorage.java
@@ -68,6 +68,7 @@
import static org.apache.pig.builtin.mock.Storage.resetData;
import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.apache.pig.builtin.mock.Storage.bag;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -311,6 +312,60 @@
assertFalse(iter.hasNext());
}
+ @Test
+ // See PIG-____
+ public void testSingleItemTuple() throws Exception {
+ Data data = resetData(pigServer);
+ data.set("foo", "a:bag{t:tuple(number:int)}",
+ tuple(bag(tuple(1), tuple(2), tuple(3))),
+ tuple(bag(tuple(4), tuple(5), tuple(6))),
+ tuple(bag(tuple(7), tuple(8), tuple(9)))
+ );
+
+ // Testing writes
+
+ //A: (a:bag{t:tuple(number:int)})
+ pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();");
+ pigServer.registerQuery("store A into '" + OUTPUT1 + "' using OrcStorage();");
+ pigServer.registerQuery("store A into '" + OUTPUT2 + "' using OrcStorage('-k');");
+
+ Reader reader1 = OrcFile.createReader(fs, Util.getFirstPartFile(new Path(OUTPUT1)));
+ //Note, "number" alias is dropped but this is an issue on Hive/OrcFile side
+ assertEquals("struct<a:array<int>>", reader1.getObjectInspector().getTypeName());
+
+ Reader reader2 = OrcFile.createReader(fs, Util.getFirstPartFile(new Path(OUTPUT2)));
+ assertEquals("struct<a:array<struct<number:int>>>", reader2.getObjectInspector().getTypeName());
+
+ assertEquals(reader1.getNumberOfRows(), 3);
+ assertEquals(reader2.getNumberOfRows(), 3);
+
+ // For read, option '-k' is ignored. It all maps back to single tuple
+ // since Pig doesn't support Bag with primitive types.
+ pigServer.registerQuery("B = load '" + OUTPUT1 + "' using OrcStorage();");
+ pigServer.registerQuery("C = load '" + OUTPUT2 + "' using OrcStorage();");
+
+ Schema schema1 = pigServer.dumpSchema("B"); // struct<a:array<int>> --> a:{(int)}
+ Schema schema2 = pigServer.dumpSchema("C"); // struct<a:array<struct<number:int>>> --> a:{(number:int)}
+
+ // Currently Hive's OrcFile doesn't seem to store the name of the fields
+ // except for Tuples. Thus, only checking the types but not aliases.
+ // equals(schema, other, relaxInner=false, relaxAlias=true)
+ assertTrue(Schema.equals(schema1, schema2, false, true));
+
+ System.err.println(schema1);
+ System.err.println(schema2);
+ Iterator<Tuple> iter1 = pigServer.openIterator("B");
+ Iterator<Tuple> iter2 = pigServer.openIterator("C");
+ int count=0;
+ while (iter1.hasNext()) {
+ Tuple t1 = iter1.next();
+ Tuple t2 = iter2.next();
+ assertEquals(t1, t2);
+ count++;
+ }
+ assertEquals(3, count);
+ }
+
private void verifyData(Path orcFile, Iterator<Tuple> iter, FileSystem fs, int expectedTotalRows) throws Exception {
int expectedRows = 0;