Merge pull request #146 from Parquet/hive_nested_types
Fix and add unit tests for Hive nested types
diff --git a/parquet-hive/src/main/java/parquet/hive/DeprecatedParquetInputFormat.java b/parquet-hive/src/main/java/parquet/hive/DeprecatedParquetInputFormat.java
index 024aa44..072d702 100644
--- a/parquet-hive/src/main/java/parquet/hive/DeprecatedParquetInputFormat.java
+++ b/parquet-hive/src/main/java/parquet/hive/DeprecatedParquetInputFormat.java
@@ -311,7 +311,7 @@
* gets a ParquetInputSplit corresponding to a split given by Hive
*
* @param oldSplit The split given by Hive
- * @param conf The JobConf of the Hive job
+ * @param conf The JobConf of the Hive job
* @return a ParquetInputSplit corresponding to the oldSplit
* @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file
*/
diff --git a/parquet-hive/src/main/java/parquet/hive/convert/ArrayWritableGroupConverter.java b/parquet-hive/src/main/java/parquet/hive/convert/ArrayWritableGroupConverter.java
index cde2843..b7a56b4 100644
--- a/parquet-hive/src/main/java/parquet/hive/convert/ArrayWritableGroupConverter.java
+++ b/parquet-hive/src/main/java/parquet/hive/convert/ArrayWritableGroupConverter.java
@@ -15,6 +15,7 @@
*/
package parquet.hive.convert;
+import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
import parquet.io.ParquetDecodingException;
@@ -34,20 +35,23 @@
private final Converter[] converters;
private final HiveGroupConverter parent;
private final int index;
+ private final boolean isMap;
private Writable currentValue;
+ private Writable[] mapPairContainer;
public ArrayWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent, final int index) {
this.parent = parent;
this.index = index;
if (groupType.getFieldCount() == 2) {
- final DataWritableGroupConverter intermediateConverter = new DataWritableGroupConverter(groupType, this, 0);
- converters = new Converter[groupType.getFieldCount()];
- converters[0] = getConverterFromDescription(groupType.getType(0), 0, intermediateConverter);
- converters[1] = getConverterFromDescription(groupType.getType(1), 1, intermediateConverter);
+ converters = new Converter[2];
+ converters[0] = getConverterFromDescription(groupType.getType(0), 0, this);
+ converters[1] = getConverterFromDescription(groupType.getType(1), 1, this);
+ isMap = true;
} else if (groupType.getFieldCount() == 1) {
converters = new Converter[1];
converters[0] = getConverterFromDescription(groupType.getType(0), 0, this);
+ isMap = false;
} else {
throw new RuntimeException("Invalid parquet hive schema: " + groupType);
}
@@ -61,19 +65,30 @@
@Override
public void start() {
+ if (isMap) {
+ mapPairContainer = new Writable[2];
+ }
}
@Override
public void end() {
+ if (isMap) {
+ currentValue = new ArrayWritable(Writable.class, mapPairContainer);
+ }
parent.add(index, currentValue);
}
@Override
protected void set(final int index, final Writable value) {
- if (index != 0) {
- throw new ParquetDecodingException("Repeated group can only have one field. Not allowed to set for the index : " + index);
+ if (index != 0 && mapPairContainer == null || index > 1) {
+ throw new ParquetDecodingException("Repeated group can only have one or two fields for maps. Not allowed to set for the index : " + index);
}
- currentValue = value;
+
+ if (isMap) {
+ mapPairContainer[index] = value;
+ } else {
+ currentValue = value;
+ }
}
@Override
diff --git a/parquet-hive/src/main/java/parquet/hive/serde/ParquetHiveMapInspector.java b/parquet-hive/src/main/java/parquet/hive/serde/ParquetHiveMapInspector.java
index e4a6a99..36b8c13 100644
--- a/parquet-hive/src/main/java/parquet/hive/serde/ParquetHiveMapInspector.java
+++ b/parquet-hive/src/main/java/parquet/hive/serde/ParquetHiveMapInspector.java
@@ -60,7 +60,8 @@
return null;
}
- final Writable[] mapArray = ((ArrayWritable) data).get();
+ final Writable[] mapContainer = ((ArrayWritable) data).get();
+ final Writable[] mapArray = ((ArrayWritable) mapContainer[0]).get();
for (final Writable obj : mapArray) {
final ArrayWritable mapObj = (ArrayWritable) obj;
@@ -80,7 +81,8 @@
return null;
}
- final Writable[] mapArray = ((ArrayWritable) data).get();
+ final Writable[] mapContainer = ((ArrayWritable) data).get();
+ final Writable[] mapArray = ((ArrayWritable) mapContainer[0]).get();
final Map<Writable, Writable> map = new HashMap<Writable, Writable>();
for (final Writable obj : mapArray) {
@@ -94,6 +96,7 @@
@Override
public int getMapSize(final Object data) {
- return ((ArrayWritable) data).get().length;
+ final Writable[] mapContainer = ((ArrayWritable) data).get();
+ return ((ArrayWritable) mapContainer[0]).get().length;
}
}
diff --git a/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetInputFormat.java b/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetInputFormat.java
index e3f0e95..b26c453 100644
--- a/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetInputFormat.java
+++ b/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetInputFormat.java
@@ -11,16 +11,12 @@
*/
package parquet.hive;
-import java.io.BufferedWriter;
import java.io.File;
-import java.io.FileWriter;
import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
import junit.framework.TestCase;
@@ -47,7 +43,9 @@
import parquet.io.ColumnIOFactory;
import parquet.io.MessageColumnIO;
import parquet.io.api.RecordConsumer;
+import parquet.schema.GroupType;
import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
import parquet.schema.PrimitiveType;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
import parquet.schema.Type.Repetition;
@@ -81,8 +79,19 @@
+ " optional double c_acctbal;\n"
+ " optional binary c_mktsegment;\n"
+ " optional binary c_comment;\n"
+ + " optional group c_map (MAP_KEY_VALUE) {\n"
+ + " repeated group map {\n"
+ + " required binary key;\n"
+ + " optional binary value;\n"
+ + " }\n"
+ + " }\n"
+ + " optional group c_list (LIST) {\n"
+ + " repeated group bag {\n"
+ + " optional int32 array_element;\n"
+ + " }\n"
+ + " }\n"
+ "}";
- readParquetHiveInputFormat(schemaRequested, new Integer[] {0, 1, 2, 3, 4, 5, 6, 7, 8});
+ readParquetHiveInputFormat(schemaRequested, new Integer[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
}
public void testParquetHiveInputFormatWithSpecificSchema() throws Exception {
@@ -153,7 +162,9 @@
new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_phone"),
new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "c_acctbal"),
new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_mktsegment"),
- new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_comment"));
+ new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_comment"),
+ new GroupType(Repetition.OPTIONAL, "c_map", OriginalType.MAP_KEY_VALUE, new GroupType(Repetition.REPEATED, "map", new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "key"), new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "value"))),
+ new GroupType(Repetition.OPTIONAL, "c_list", OriginalType.LIST, new GroupType(Repetition.REPEATED, "bag", new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "array_element"))));
final MemPageStore pageStore = new MemPageStore(1000);
final ColumnWriteStoreImpl store = new ColumnWriteStoreImpl(pageStore, 8 * 1024, 8 * 1024, false);
@@ -161,17 +172,25 @@
final RecordConsumer recordWriter = columnIO.getRecordWriter(store);
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("testkey", "testvalue");
+ map.put("foo", "bar");
+
+ List<Integer> list = new ArrayList<Integer>();
+ list.add(0);
+ list.add(12);
+ list.add(17);
+
int recordCount = 0;
mapData.clear();
for (int i = 0; i < 1000; i++) {
recordWriter.startMessage();
- // yeah same test as pig one :)
mapData.put(i, UtilitiesTestMethods.createArrayWritable(i, i % 11 == 0 ? null : "name_" + i, i % 12 == 0 ? null : "add_" + i,
i % 13 == 0 ? null : i, i % 14 == 0 ? null : "phone_" + i, i % 15 == 0 ? null : 1.2d * i, i % 16 == 0 ? null : "mktsegment_" + i,
- i % 17 == 0 ? null : "comment_" + i));
+ i % 17 == 0 ? null : "comment_" + i, i % 18 == 0 ? null : map, i % 19 == 0 ? null : list));
saveData(recordWriter, i, i % 11 == 0 ? null : "name_" + i, i % 12 == 0 ? null : "add_" + i,
i % 13 == 0 ? null : i, i % 14 == 0 ? null : "phone_" + i, i % 15 == 0 ? null : 1.2d * i, i % 16 == 0 ? null : "mktsegment_" + i,
- i % 17 == 0 ? null : "comment_" + i);
+ i % 17 == 0 ? null : "comment_" + i, i % 18 == 0 ? null : map, i % 19 == 0 ? null : list);
recordWriter.endMessage();
++recordCount;
}
@@ -181,7 +200,7 @@
}
private void saveData(final RecordConsumer recordWriter, final Integer custkey, final String name, final String address, final Integer nationkey, final String phone,
- final Double acctbal, final String mktsegment, final String comment) {
+ final Double acctbal, final String mktsegment, final String comment, final Map<String, String> map, final List<Integer> list) {
UtilitiesTestMethods.writeField(recordWriter, 0, "c_custkey", custkey);
UtilitiesTestMethods.writeField(recordWriter, 1, "c_name", name);
UtilitiesTestMethods.writeField(recordWriter, 2, "c_address", address);
@@ -190,9 +209,11 @@
UtilitiesTestMethods.writeField(recordWriter, 5, "c_acctbal", acctbal);
UtilitiesTestMethods.writeField(recordWriter, 6, "c_mktsegment", mktsegment);
UtilitiesTestMethods.writeField(recordWriter, 7, "c_comment", comment);
+ UtilitiesTestMethods.writeField(recordWriter, 8, "c_map", map);
+ UtilitiesTestMethods.writeField(recordWriter, 9, "c_list", list);
}
- private void readParquetHiveInputFormat(final String schemaRequested, Integer[] arrCheckIndexValues) throws Exception {
+ private void readParquetHiveInputFormat(final String schemaRequested, final Integer[] arrCheckIndexValues) throws Exception {
final ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, new Path(testFile.getAbsolutePath()));
final MessageType schema = readFooter.getFileMetaData().getSchema();
@@ -219,6 +240,17 @@
+ " optional double c_acctbal;\n"
+ " optional binary c_mktsegment;\n"
+ " optional binary c_comment;\n"
+ + " optional group c_map (MAP_KEY_VALUE) {\n"
+ + " repeated group map {\n"
+ + " required binary key;\n"
+ + " optional binary value;\n"
+ + " }\n"
+ + " }\n"
+ + " optional group c_list (LIST) {\n"
+ + " repeated group bag {\n"
+ + " optional int32 array_element;\n"
+ + " }\n"
+ + " }\n"
+ " optional int32 unknown;\n"
+ "}";
diff --git a/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetOuputFormat.java b/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetOuputFormat.java
index 9baf300..0bdc4b5 100644
--- a/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetOuputFormat.java
+++ b/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetOuputFormat.java
@@ -13,13 +13,14 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
+
import junit.framework.TestCase;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -35,6 +36,7 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.Progressable;
+
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.ParquetInputSplit;
import parquet.hadoop.metadata.BlockMetaData;
@@ -74,32 +76,41 @@
}
}
reporter = Reporter.NULL;
+
+ Map<String, String> map = new HashMap<String, String>();
+ map.put("testkey", "testvalue");
+ map.put("foo", "bar");
+
+ List<Integer> list = new ArrayList<Integer>();
+ list.add(0);
+ list.add(12);
+ list.add(17);
+
mapData = new HashMap<Integer, ArrayWritable>();
mapData.clear();
for (int i = 0; i < 1000; i++) {
- // yeah same test as pig one :)
mapData.put(i, UtilitiesTestMethods.createArrayWritable(i, i % 11 == 0 ? null : "name_" + i, i % 12 == 0 ? null : "add_" + i,
i % 13 == 0 ? null : i, i % 14 == 0 ? null : "phone_" + i, i % 15 == 0 ? null : 1.2d * i, i % 16 == 0 ? null : "mktsegment_" + i,
- i % 17 == 0 ? null : "comment_" + i));
+ i % 17 == 0 ? null : "comment_" + i, i % 18 == 0 ? null : map, i % 19 == 0 ? null : list));
}
}
public void testParquetHiveOutputFormat() throws Exception {
- final HiveOutputFormat format = new DeprecatedParquetOutputFormat();
+ final HiveOutputFormat<Void, ArrayWritable> format = new DeprecatedParquetOutputFormat();
final Properties tableProperties = new Properties();
// Set the configuration parameters
tableProperties.setProperty("columns",
- "c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment");
+ "c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment,c_map,c_list");
tableProperties.setProperty("columns.types",
- "int:string:string:int:string:double:string:string");
+ "int:string:string:int:string:double:string:string:map<string,string>:array<int>");
tableProperties.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL");
System.out.println("First part, write the data");
job.set("mapred.task.id", "attempt_201304241759_32973_m_000002_0"); // FAKE ID
- fakeStatus reporter = new fakeStatus();
- org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter recordWriter = format.getHiveRecordWriter(
+ final fakeStatus reporter = new fakeStatus();
+ final org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter recordWriter = format.getHiveRecordWriter(
job,
new Path(testFile.getAbsolutePath()),
NullWritable.class,
@@ -107,7 +118,7 @@
tableProperties,
reporter);
// create key/value
- for (Map.Entry<Integer, ArrayWritable> entry : mapData.entrySet()) {
+ for (final Map.Entry<Integer, ArrayWritable> entry : mapData.entrySet()) {
recordWriter.write(entry.getValue());
}
recordWriter.close(false);
@@ -141,6 +152,17 @@
+ " optional double c_acctbal;\n"
+ " optional binary c_mktsegment;\n"
+ " optional binary c_comment;\n"
+ + " optional group c_map (MAP_KEY_VALUE) {\n"
+ + " repeated group map {\n"
+ + " required binary key;\n"
+ + " optional binary value;\n"
+ + " }\n"
+ + " }\n"
+ + " optional group c_list (LIST) {\n"
+ + " repeated group bag {\n"
+ + " optional int32 array_element;\n"
+ + " }\n"
+ + " }\n"
+ "}";
@@ -167,9 +189,9 @@
final Writable[] writableArr = arrValue;
final ArrayWritable expected = mapData.get(((IntWritable) writableArr[0]).get());
final Writable[] arrExpected = expected.get();
- assertEquals(arrValue.length, 8);
+ assertEquals(arrValue.length, 10);
- final boolean deepEquals = UtilitiesTestMethods.smartCheckArray(arrValue, arrExpected, new Integer[] {0, 1, 2, 3, 4, 5, 6, 7});
+ final boolean deepEquals = UtilitiesTestMethods.smartCheckArray(arrValue, arrExpected, new Integer[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
assertTrue(deepEquals);
count++;
@@ -184,12 +206,12 @@
private class fakeStatus extends org.apache.hadoop.mapreduce.StatusReporter implements Progressable {
@Override
- public Counter getCounter(Enum<?> e) {
+ public Counter getCounter(final Enum<?> e) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
- public Counter getCounter(String string, String string1) {
+ public Counter getCounter(final String string, final String string1) {
throw new UnsupportedOperationException("Not supported yet.");
}
@@ -199,7 +221,7 @@
}
@Override
- public void setStatus(String string) {
+ public void setStatus(final String string) {
throw new UnsupportedOperationException("Not supported yet.");
}
diff --git a/parquet-hive/src/test/java/parquet/hive/TestHiveSchemaConverter.java b/parquet-hive/src/test/java/parquet/hive/TestHiveSchemaConverter.java
index 8eb1b9c..0bba7cf 100644
--- a/parquet-hive/src/test/java/parquet/hive/TestHiveSchemaConverter.java
+++ b/parquet-hive/src/test/java/parquet/hive/TestHiveSchemaConverter.java
@@ -73,15 +73,53 @@
@Test
public void testSimpleType() throws Exception {
testConversion(
- "a,b,c",
- "int,double,boolean",
- "message hive_schema {\n" +
- " optional int32 a;\n" +
- " optional double b;\n" +
- " optional boolean c;\n" +
- " }\n" +
- "}\n");
+ "a,b,c",
+ "int,double,boolean",
+ "message hive_schema {\n"
+ + " optional int32 a;\n"
+ + " optional double b;\n"
+ + " optional boolean c;\n"
+ + " }\n"
+ + "}\n");
}
- // TODO : To be completed
+ @Test
+ public void testArray() throws Exception {
+ testConversion("arrayCol",
+ "array<int>",
+ "message hive_schema {\n"
+ + " optional group arrayCol (LIST) {\n"
+ + " repeated group bag {\n"
+ + " optional int32 array_element;\n"
+ + " }\n"
+ + " }\n"
+ + "}\n");
+ }
+
+ @Test
+ public void testStruct() throws Exception {
+ testConversion("structCol",
+ "struct<a:int,b:double,c:boolean>",
+ "message hive_schema {\n"
+ + " optional group structCol {\n"
+ + " optional int32 a;\n"
+ + " optional double b;\n"
+ + " optional boolean c;\n"
+ + " }\n"
+ + "}\n");
+ }
+
+ @Test
+ public void testMap() throws Exception {
+ testConversion("mapCol",
+ "map<string,string>",
+ "message hive_schema {\n"
+ + " optional group mapCol (MAP_KEY_VALUE) {\n"
+ + " repeated group map {\n"
+ + " required binary key;\n"
+ + " optional binary value;\n"
+ + " }\n"
+ + " }\n"
+ + "}\n");
+ }
}
diff --git a/parquet-hive/src/test/java/parquet/hive/TestParquetSerDe.java b/parquet-hive/src/test/java/parquet/hive/TestParquetSerDe.java
index 65d7ed6..cc08bcc 100644
--- a/parquet-hive/src/test/java/parquet/hive/TestParquetSerDe.java
+++ b/parquet-hive/src/test/java/parquet/hive/TestParquetSerDe.java
@@ -11,11 +11,8 @@
*/
package parquet.hive;
-import java.util.Arrays;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
import java.util.Properties;
+
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
@@ -53,7 +50,7 @@
serDe.initialize(conf, tbl);
// Data
- Writable[] arr = new Writable[6];
+ final Writable[] arr = new Writable[8];
arr[0] = new ByteWritable((byte) 123);
arr[1] = new ShortWritable((short) 456);
@@ -62,6 +59,24 @@
arr[4] = new DoubleWritable((double) 5.3);
arr[5] = new BinaryWritable("hive and hadoop and parquet. Big family.");
+ final Writable[] mapContainer = new Writable[1];
+ final Writable[] map = new Writable[3];
+ for (int i = 0; i < 3; ++i) {
+ final Writable[] pair = new Writable[2];
+ pair[0] = new BinaryWritable("key_" + i);
+ pair[1] = new IntWritable(i);
+ map[i] = new ArrayWritable(Writable.class, pair);
+ }
+ mapContainer[0] = new ArrayWritable(Writable.class, map);
+ arr[6] = new ArrayWritable(Writable.class, mapContainer);
+
+ final Writable[] arrayContainer = new Writable[1];
+ final Writable[] array = new Writable[5];
+ for (int i = 0; i < 5; ++i) {
+ array[i] = new BinaryWritable("elem_" + i);
+ }
+ arrayContainer[0] = new ArrayWritable(Writable.class, array);
+ arr[7] = new ArrayWritable(Writable.class, arrayContainer);
final ArrayWritable arrWritable = new ArrayWritable(Writable.class, arr);
// Test
@@ -74,36 +89,29 @@
}
}
- private void deserializeAndSerializeLazySimple(final ParquetHiveSerDe serDe, final ArrayWritable t)
- throws SerDeException {
+ private void deserializeAndSerializeLazySimple(final ParquetHiveSerDe serDe, final ArrayWritable t) throws SerDeException {
// Get the row structure
- final StructObjectInspector oi = (StructObjectInspector) serDe
- .getObjectInspector();
+ final StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector();
// Deserialize
final Object row = serDe.deserialize(t);
- assertEquals("deserialize gave the wrong object", row.getClass(), ArrayWritable.class);
- assertEquals("serialized size correct after deserialization", serDe.getSerDeStats()
- .getRawDataSize(), t.get().length);
- assertEquals("deserialisation give the wrong object", t, row);
+ assertEquals("deserialization gives the wrong object class", row.getClass(), ArrayWritable.class);
+ assertEquals("size correct after deserialization", serDe.getSerDeStats().getRawDataSize(), t.get().length);
+ assertEquals("deserialization gives the wrong object", t, row);
// Serialize
final ArrayWritable serializedArr = (ArrayWritable) serDe.serialize(row, oi);
- assertEquals("serialized size correct after serialization", serDe.getSerDeStats()
- .getRawDataSize(),
- serializedArr.get().length);
- assertTrue("serialize Object to ArrayWritable should be equals", Arrays.deepEquals(t.get(), serializedArr.get()));
+ assertEquals("size correct after serialization", serDe.getSerDeStats().getRawDataSize(), serializedArr.get().length);
+ assertTrue("serialized object should be equal to starting object", UtilitiesTestMethods.arrayWritableEquals(t, serializedArr));
}
private Properties createProperties() {
final Properties tbl = new Properties();
// Set the configuration parameters
- tbl.setProperty("columns",
- "abyte,ashort,aint,along,adouble,astring");
- tbl.setProperty("columns.types",
- "tinyint:smallint:int:bigint:double:string");
+ tbl.setProperty("columns", "abyte,ashort,aint,along,adouble,astring,amap,alist");
+ tbl.setProperty("columns.types", "tinyint:smallint:int:bigint:double:string:map<string,int>:array<string>");
tbl.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL");
return tbl;
}
diff --git a/parquet-hive/src/test/java/parquet/hive/UtilitiesTestMethods.java b/parquet-hive/src/test/java/parquet/hive/UtilitiesTestMethods.java
index 9001f1e..f14f77d 100644
--- a/parquet-hive/src/test/java/parquet/hive/UtilitiesTestMethods.java
+++ b/parquet-hive/src/test/java/parquet/hive/UtilitiesTestMethods.java
@@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -36,16 +37,17 @@
w.end(new HashMap<String, String>());
}
- public static boolean smartCheckArray(Writable[] arrValue, Writable[] arrExpected, Integer[] arrCheckIndexValues) {
+ public static boolean smartCheckArray(final Writable[] arrValue, final Writable[] arrExpected, final Integer[] arrCheckIndexValues) {
int i = 0;
- for (Integer index : arrCheckIndexValues) {
+ for (final Integer index : arrCheckIndexValues) {
if (index != Integer.MIN_VALUE) {
final Writable value = arrValue[index];
final Writable expectedValue = arrExpected[index];
if (((value == null && expectedValue == null)
- || (((value != null && expectedValue != null) && (value.equals(expectedValue))))) == false) {
+ || (((value != null && expectedValue != null) && (value.equals(expectedValue))))
+ || (value != null && expectedValue != null && value instanceof ArrayWritable && expectedValue instanceof ArrayWritable && arrayWritableEquals((ArrayWritable) value, (ArrayWritable) expectedValue))) == false) {
return false;
}
} else {
@@ -60,9 +62,35 @@
return true;
}
- static public ArrayWritable createArrayWritable(final Integer custkey, final String name, final String address, final Integer nationkey, final String phone, final Double acctbal, final String mktsegment, final String comment) {
+ public static boolean arrayWritableEquals(final ArrayWritable a1, final ArrayWritable a2) {
+ final Writable[] a1Arr = a1.get();
+ final Writable[] a2Arr = a2.get();
- Writable[] arr = new Writable[9]; // The last one is for the unknow column
+ if (a1Arr.length != a2Arr.length) {
+ return false;
+ }
+
+ for (int i = 0; i < a1Arr.length; ++i) {
+ if (a1Arr[i] instanceof ArrayWritable) {
+ if (!(a2Arr[i] instanceof ArrayWritable)) {
+ return false;
+ }
+ if (!arrayWritableEquals((ArrayWritable) a1Arr[i], (ArrayWritable) a2Arr[i])) {
+ return false;
+ }
+ } else {
+ if (!a1Arr[i].equals(a2Arr[i])) {
+ return false;
+ }
+ }
+
+ }
+ return true;
+ }
+
+ static public ArrayWritable createArrayWritable(final Integer custkey, final String name, final String address, final Integer nationkey, final String phone, final Double acctbal, final String mktsegment, final String comment, final Map<String, String> map, final List<Integer> list) {
+
+ final Writable[] arr = new Writable[11]; // The last one is for the unknown column
arr[0] = new IntWritable(custkey);
if (name != null) {
arr[1] = new BinaryWritable(name);
@@ -85,6 +113,29 @@
if (comment != null) {
arr[7] = new BinaryWritable(comment);
}
+ if (map != null) {
+ final Writable[] mapContainer = new Writable[1];
+ final Writable[] mapArr = new Writable[map.size()];
+ int i = 0;
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ final Writable[] pair = new Writable[2];
+ pair[0] = new BinaryWritable(entry.getKey());
+ pair[1] = new BinaryWritable(entry.getValue());
+ mapArr[i] = new ArrayWritable(Writable.class, pair);
+ ++i;
+ }
+ mapContainer[0] = new ArrayWritable(Writable.class, mapArr);
+ arr[8] = new ArrayWritable(Writable.class, mapContainer);
+ }
+ if (list != null) {
+ final Writable[] listContainer = new Writable[1];
+ final Writable[] listArr = new Writable[list.size()];
+ for (int i = 0; i < list.size(); ++i) {
+ listArr[i] = new IntWritable(list.get(i));
+ }
+ listContainer[0] = new ArrayWritable(Writable.class, listArr);
+ arr[9] = new ArrayWritable(Writable.class, listContainer);
+ }
return new ArrayWritable(Writable.class, arr);
}
@@ -131,6 +182,27 @@
recordWriter.addBinary(Binary.fromString((String) value));
} else if (value instanceof Double) {
recordWriter.addDouble((Double) value);
+ } else if (value instanceof Map) {
+ recordWriter.startGroup();
+ recordWriter.startField("map", 0);
+ for (Object entry : ((Map) value).entrySet()) {
+ recordWriter.startGroup();
+ writeField(recordWriter, 0, "key", ((Map.Entry) entry).getKey());
+ writeField(recordWriter, 1, "value", ((Map.Entry) entry).getValue());
+ recordWriter.endGroup();
+ }
+ recordWriter.endField("map", 0);
+ recordWriter.endGroup();
+ } else if (value instanceof List) {
+ recordWriter.startGroup();
+ recordWriter.startField("bag", 0);
+ for (Object element : (List) value) {
+ recordWriter.startGroup();
+ writeField(recordWriter, 0, "array_element", element);
+ recordWriter.endGroup();
+ }
+ recordWriter.endField("bag", 0);
+ recordWriter.endGroup();
} else {
throw new IllegalArgumentException(value.getClass().getName() + " not supported");
}