Merge pull request #146 from Parquet/hive_nested_types

Fix and add unit tests for Hive nested types
diff --git a/parquet-hive/src/main/java/parquet/hive/DeprecatedParquetInputFormat.java b/parquet-hive/src/main/java/parquet/hive/DeprecatedParquetInputFormat.java
index 024aa44..072d702 100644
--- a/parquet-hive/src/main/java/parquet/hive/DeprecatedParquetInputFormat.java
+++ b/parquet-hive/src/main/java/parquet/hive/DeprecatedParquetInputFormat.java
@@ -311,7 +311,7 @@
      * gets a ParquetInputSplit corresponding to a split given by Hive
      *
      * @param oldSplit The split given by Hive
-     * @param conf The JobConf of the Hive job
+     * @param conf     The JobConf of the Hive job
      * @return a ParquetInputSplit corresponding to the oldSplit
      * @throws IOException if the config cannot be enhanced or if the footer cannot be read from the file
      */
diff --git a/parquet-hive/src/main/java/parquet/hive/convert/ArrayWritableGroupConverter.java b/parquet-hive/src/main/java/parquet/hive/convert/ArrayWritableGroupConverter.java
index cde2843..b7a56b4 100644
--- a/parquet-hive/src/main/java/parquet/hive/convert/ArrayWritableGroupConverter.java
+++ b/parquet-hive/src/main/java/parquet/hive/convert/ArrayWritableGroupConverter.java
@@ -15,6 +15,7 @@
  */
 package parquet.hive.convert;
 
+import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.Writable;
 
 import parquet.io.ParquetDecodingException;
@@ -34,20 +35,23 @@
   private final Converter[] converters;
   private final HiveGroupConverter parent;
   private final int index;
+  private final boolean isMap;
   private Writable currentValue;
+  private Writable[] mapPairContainer;
 
   public ArrayWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent, final int index) {
     this.parent = parent;
     this.index = index;
 
     if (groupType.getFieldCount() == 2) {
-      final DataWritableGroupConverter intermediateConverter = new DataWritableGroupConverter(groupType, this, 0);
-      converters = new Converter[groupType.getFieldCount()];
-      converters[0] = getConverterFromDescription(groupType.getType(0), 0, intermediateConverter);
-      converters[1] = getConverterFromDescription(groupType.getType(1), 1, intermediateConverter);
+      converters = new Converter[2];
+      converters[0] = getConverterFromDescription(groupType.getType(0), 0, this);
+      converters[1] = getConverterFromDescription(groupType.getType(1), 1, this);
+      isMap = true;
     } else if (groupType.getFieldCount() == 1) {
       converters = new Converter[1];
       converters[0] = getConverterFromDescription(groupType.getType(0), 0, this);
+      isMap = false;
     } else {
       throw new RuntimeException("Invalid parquet hive schema: " + groupType);
     }
@@ -61,19 +65,30 @@
 
   @Override
   public void start() {
+    if (isMap) {
+      mapPairContainer = new Writable[2];
+    }
   }
 
   @Override
   public void end() {
+    if (isMap) {
+      currentValue = new ArrayWritable(Writable.class, mapPairContainer);
+    }
     parent.add(index, currentValue);
   }
 
   @Override
   protected void set(final int index, final Writable value) {
-    if (index != 0) {
-      throw new ParquetDecodingException("Repeated group can only have one field. Not allowed to set for the index : " + index);
+    if (index != 0 && mapPairContainer == null || index > 1) {
+      throw new ParquetDecodingException("Repeated group can only have one or two fields for maps. Not allowed to set for the index : " + index);
     }
-    currentValue = value;
+
+    if (isMap) {
+      mapPairContainer[index] = value;
+    } else {
+      currentValue = value;
+    }
   }
 
   @Override
diff --git a/parquet-hive/src/main/java/parquet/hive/serde/ParquetHiveMapInspector.java b/parquet-hive/src/main/java/parquet/hive/serde/ParquetHiveMapInspector.java
index e4a6a99..36b8c13 100644
--- a/parquet-hive/src/main/java/parquet/hive/serde/ParquetHiveMapInspector.java
+++ b/parquet-hive/src/main/java/parquet/hive/serde/ParquetHiveMapInspector.java
@@ -60,7 +60,8 @@
       return null;
     }
 
-    final Writable[] mapArray = ((ArrayWritable) data).get();
+    final Writable[] mapContainer = ((ArrayWritable) data).get();
+    final Writable[] mapArray = ((ArrayWritable) mapContainer[0]).get();
 
     for (final Writable obj : mapArray) {
       final ArrayWritable mapObj = (ArrayWritable) obj;
@@ -80,7 +81,8 @@
       return null;
     }
 
-    final Writable[] mapArray = ((ArrayWritable) data).get();
+    final Writable[] mapContainer = ((ArrayWritable) data).get();
+    final Writable[] mapArray = ((ArrayWritable) mapContainer[0]).get();
     final Map<Writable, Writable> map = new HashMap<Writable, Writable>();
 
     for (final Writable obj : mapArray) {
@@ -94,6 +96,7 @@
 
   @Override
   public int getMapSize(final Object data) {
-    return ((ArrayWritable) data).get().length;
+    final Writable[] mapContainer = ((ArrayWritable) data).get();
+    return ((ArrayWritable) mapContainer[0]).get().length;
   }
 }
diff --git a/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetInputFormat.java b/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetInputFormat.java
index e3f0e95..b26c453 100644
--- a/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetInputFormat.java
+++ b/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetInputFormat.java
@@ -11,16 +11,12 @@
  */
 package parquet.hive;
 
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
 
 import junit.framework.TestCase;
 
@@ -47,7 +43,9 @@
 import parquet.io.ColumnIOFactory;
 import parquet.io.MessageColumnIO;
 import parquet.io.api.RecordConsumer;
+import parquet.schema.GroupType;
 import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
 import parquet.schema.PrimitiveType;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 import parquet.schema.Type.Repetition;
@@ -81,8 +79,19 @@
             + "  optional double c_acctbal;\n"
             + "  optional binary c_mktsegment;\n"
             + "  optional binary c_comment;\n"
+            + "  optional group c_map (MAP_KEY_VALUE) {\n"
+            + "    repeated group map {\n"
+            + "      required binary key;\n"
+            + "      optional binary value;\n"
+            + "    }\n"
+            + "  }\n"
+            + "  optional group c_list (LIST) {\n"
+            + "    repeated group bag {\n"
+            + "      optional int32 array_element;\n"
+            + "    }\n"
+            + "  }\n"
             + "}";
-    readParquetHiveInputFormat(schemaRequested, new Integer[] {0, 1, 2, 3, 4, 5, 6, 7, 8});
+    readParquetHiveInputFormat(schemaRequested, new Integer[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
   }
 
   public void testParquetHiveInputFormatWithSpecificSchema() throws Exception {
@@ -153,7 +162,9 @@
             new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_phone"),
             new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "c_acctbal"),
             new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_mktsegment"),
-            new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_comment"));
+            new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "c_comment"),
+            new GroupType(Repetition.OPTIONAL, "c_map", OriginalType.MAP_KEY_VALUE, new GroupType(Repetition.REPEATED, "map", new PrimitiveType(Repetition.REQUIRED, PrimitiveTypeName.BINARY, "key"), new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "value"))),
+            new GroupType(Repetition.OPTIONAL, "c_list", OriginalType.LIST, new GroupType(Repetition.REPEATED, "bag", new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "array_element"))));
 
     final MemPageStore pageStore = new MemPageStore(1000);
     final ColumnWriteStoreImpl store = new ColumnWriteStoreImpl(pageStore, 8 * 1024, 8 * 1024, false);
@@ -161,17 +172,25 @@
 
     final RecordConsumer recordWriter = columnIO.getRecordWriter(store);
 
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("testkey", "testvalue");
+    map.put("foo", "bar");
+
+    List<Integer> list = new ArrayList<Integer>();
+    list.add(0);
+    list.add(12);
+    list.add(17);
+
     int recordCount = 0;
     mapData.clear();
     for (int i = 0; i < 1000; i++) {
       recordWriter.startMessage();
-      // yeah same test as pig one :)
       mapData.put(i, UtilitiesTestMethods.createArrayWritable(i, i % 11 == 0 ? null : "name_" + i, i % 12 == 0 ? null : "add_" + i,
               i % 13 == 0 ? null : i, i % 14 == 0 ? null : "phone_" + i, i % 15 == 0 ? null : 1.2d * i, i % 16 == 0 ? null : "mktsegment_" + i,
-              i % 17 == 0 ? null : "comment_" + i));
+              i % 17 == 0 ? null : "comment_" + i, i % 18 == 0 ? null : map, i % 19 == 0 ? null : list));
       saveData(recordWriter, i, i % 11 == 0 ? null : "name_" + i, i % 12 == 0 ? null : "add_" + i,
               i % 13 == 0 ? null : i, i % 14 == 0 ? null : "phone_" + i, i % 15 == 0 ? null : 1.2d * i, i % 16 == 0 ? null : "mktsegment_" + i,
-              i % 17 == 0 ? null : "comment_" + i);
+              i % 17 == 0 ? null : "comment_" + i, i % 18 == 0 ? null : map, i % 19 == 0 ? null : list);
       recordWriter.endMessage();
       ++recordCount;
     }
@@ -181,7 +200,7 @@
   }
 
   private void saveData(final RecordConsumer recordWriter, final Integer custkey, final String name, final String address, final Integer nationkey, final String phone,
-          final Double acctbal, final String mktsegment, final String comment) {
+          final Double acctbal, final String mktsegment, final String comment, final Map<String, String> map, final List<Integer> list) {
     UtilitiesTestMethods.writeField(recordWriter, 0, "c_custkey", custkey);
     UtilitiesTestMethods.writeField(recordWriter, 1, "c_name", name);
     UtilitiesTestMethods.writeField(recordWriter, 2, "c_address", address);
@@ -190,9 +209,11 @@
     UtilitiesTestMethods.writeField(recordWriter, 5, "c_acctbal", acctbal);
     UtilitiesTestMethods.writeField(recordWriter, 6, "c_mktsegment", mktsegment);
     UtilitiesTestMethods.writeField(recordWriter, 7, "c_comment", comment);
+    UtilitiesTestMethods.writeField(recordWriter, 8, "c_map", map);
+    UtilitiesTestMethods.writeField(recordWriter, 9, "c_list", list);
   }
 
-  private void readParquetHiveInputFormat(final String schemaRequested, Integer[] arrCheckIndexValues) throws Exception {
+  private void readParquetHiveInputFormat(final String schemaRequested, final Integer[] arrCheckIndexValues) throws Exception {
     final ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, new Path(testFile.getAbsolutePath()));
     final MessageType schema = readFooter.getFileMetaData().getSchema();
 
@@ -219,6 +240,17 @@
             + "  optional double c_acctbal;\n"
             + "  optional binary c_mktsegment;\n"
             + "  optional binary c_comment;\n"
+            + "  optional group c_map (MAP_KEY_VALUE) {\n"
+            + "    repeated group map {\n"
+            + "      required binary key;\n"
+            + "      optional binary value;\n"
+            + "    }\n"
+            + "  }\n"
+            + "  optional group c_list (LIST) {\n"
+            + "    repeated group bag {\n"
+            + "      optional int32 array_element;\n"
+            + "    }\n"
+            + "  }\n"
             + "  optional int32 unknown;\n"
             + "}";
 
diff --git a/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetOuputFormat.java b/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetOuputFormat.java
index 9baf300..0bdc4b5 100644
--- a/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetOuputFormat.java
+++ b/parquet-hive/src/test/java/parquet/hive/TestDeprecatedParquetOuputFormat.java
@@ -13,13 +13,14 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
+
 import junit.framework.TestCase;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,6 +36,7 @@
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.util.Progressable;
+
 import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.ParquetInputSplit;
 import parquet.hadoop.metadata.BlockMetaData;
@@ -74,32 +76,41 @@
       }
     }
     reporter = Reporter.NULL;
+
+    Map<String, String> map = new HashMap<String, String>();
+    map.put("testkey", "testvalue");
+    map.put("foo", "bar");
+
+    List<Integer> list = new ArrayList<Integer>();
+    list.add(0);
+    list.add(12);
+    list.add(17);
+
     mapData = new HashMap<Integer, ArrayWritable>();
     mapData.clear();
     for (int i = 0; i < 1000; i++) {
-      // yeah same test as pig one :)
       mapData.put(i, UtilitiesTestMethods.createArrayWritable(i, i % 11 == 0 ? null : "name_" + i, i % 12 == 0 ? null : "add_" + i,
               i % 13 == 0 ? null : i, i % 14 == 0 ? null : "phone_" + i, i % 15 == 0 ? null : 1.2d * i, i % 16 == 0 ? null : "mktsegment_" + i,
-              i % 17 == 0 ? null : "comment_" + i));
+              i % 17 == 0 ? null : "comment_" + i, i % 18 == 0 ? null : map, i % 19 == 0 ? null : list));
     }
   }
 
   public void testParquetHiveOutputFormat() throws Exception {
-    final HiveOutputFormat format = new DeprecatedParquetOutputFormat();
+    final HiveOutputFormat<Void, ArrayWritable> format = new DeprecatedParquetOutputFormat();
     final Properties tableProperties = new Properties();
 
     // Set the configuration parameters
     tableProperties.setProperty("columns",
-            "c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment");
+            "c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment,c_map,c_list");
     tableProperties.setProperty("columns.types",
-            "int:string:string:int:string:double:string:string");
+            "int:string:string:int:string:double:string:string:map<string,string>:array<int>");
     tableProperties.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL");
 
     System.out.println("First part, write the data");
 
     job.set("mapred.task.id", "attempt_201304241759_32973_m_000002_0"); // FAKE ID
-    fakeStatus reporter = new fakeStatus();
-    org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter recordWriter = format.getHiveRecordWriter(
+    final fakeStatus reporter = new fakeStatus();
+    final org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter recordWriter = format.getHiveRecordWriter(
             job,
             new Path(testFile.getAbsolutePath()),
             NullWritable.class,
@@ -107,7 +118,7 @@
             tableProperties,
             reporter);
     // create key/value
-    for (Map.Entry<Integer, ArrayWritable> entry : mapData.entrySet()) {
+    for (final Map.Entry<Integer, ArrayWritable> entry : mapData.entrySet()) {
       recordWriter.write(entry.getValue());
     }
     recordWriter.close(false);
@@ -141,6 +152,17 @@
             + "  optional double c_acctbal;\n"
             + "  optional binary c_mktsegment;\n"
             + "  optional binary c_comment;\n"
+            + "  optional group c_map (MAP_KEY_VALUE) {\n"
+            + "    repeated group map {\n"
+            + "      required binary key;\n"
+            + "      optional binary value;\n"
+            + "    }\n"
+            + "  }\n"
+            + "  optional group c_list (LIST) {\n"
+            + "    repeated group bag {\n"
+            + "      optional int32 array_element;\n"
+            + "    }\n"
+            + "  }\n"
             + "}";
 
 
@@ -167,9 +189,9 @@
       final Writable[] writableArr = arrValue;
       final ArrayWritable expected = mapData.get(((IntWritable) writableArr[0]).get());
       final Writable[] arrExpected = expected.get();
-      assertEquals(arrValue.length, 8);
+      assertEquals(arrValue.length, 10);
 
-      final boolean deepEquals = UtilitiesTestMethods.smartCheckArray(arrValue, arrExpected, new Integer[] {0, 1, 2, 3, 4, 5, 6, 7});
+      final boolean deepEquals = UtilitiesTestMethods.smartCheckArray(arrValue, arrExpected, new Integer[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
 
       assertTrue(deepEquals);
       count++;
@@ -184,12 +206,12 @@
   private class fakeStatus extends org.apache.hadoop.mapreduce.StatusReporter implements Progressable {
 
     @Override
-    public Counter getCounter(Enum<?> e) {
+    public Counter getCounter(final Enum<?> e) {
       throw new UnsupportedOperationException("Not supported yet.");
     }
 
     @Override
-    public Counter getCounter(String string, String string1) {
+    public Counter getCounter(final String string, final String string1) {
       throw new UnsupportedOperationException("Not supported yet.");
     }
 
@@ -199,7 +221,7 @@
     }
 
     @Override
-    public void setStatus(String string) {
+    public void setStatus(final String string) {
       throw new UnsupportedOperationException("Not supported yet.");
     }
 
diff --git a/parquet-hive/src/test/java/parquet/hive/TestHiveSchemaConverter.java b/parquet-hive/src/test/java/parquet/hive/TestHiveSchemaConverter.java
index 8eb1b9c..0bba7cf 100644
--- a/parquet-hive/src/test/java/parquet/hive/TestHiveSchemaConverter.java
+++ b/parquet-hive/src/test/java/parquet/hive/TestHiveSchemaConverter.java
@@ -73,15 +73,53 @@
   @Test
   public void testSimpleType() throws Exception {
     testConversion(
-        "a,b,c",
-        "int,double,boolean",
-        "message hive_schema {\n" +
-            "  optional int32 a;\n" +
-            "  optional double b;\n" +
-            "  optional boolean c;\n" +
-            "  }\n" +
-        "}\n");
+            "a,b,c",
+            "int,double,boolean",
+            "message hive_schema {\n"
+            + "  optional int32 a;\n"
+            + "  optional double b;\n"
+            + "  optional boolean c;\n"
+            + "  }\n"
+            + "}\n");
   }
 
-  // TODO : To be completed
+  @Test
+  public void testArray() throws Exception {
+    testConversion("arrayCol",
+            "array<int>",
+            "message hive_schema {\n"
+            + "  optional group arrayCol (LIST) {\n"
+            + "    repeated group bag {\n"
+            + "      optional int32 array_element;\n"
+            + "    }\n"
+            + "  }\n"
+            + "}\n");
+  }
+
+  @Test
+  public void testStruct() throws Exception {
+    testConversion("structCol",
+            "struct<a:int,b:double,c:boolean>",
+            "message hive_schema {\n"
+            + "  optional group structCol {\n"
+            + "    optional int32 a;\n"
+            + "    optional double b;\n"
+            + "    optional boolean c;\n"
+            + "  }\n"
+            + "}\n");
+  }
+
+  @Test
+  public void testMap() throws Exception {
+    testConversion("mapCol",
+            "map<string,string>",
+            "message hive_schema {\n"
+            + "  optional group mapCol (MAP_KEY_VALUE) {\n"
+            + "    repeated group map {\n"
+            + "      required binary key;\n"
+            + "      optional binary value;\n"
+            + "    }\n"
+            + "  }\n"
+            + "}\n");
+  }
 }
diff --git a/parquet-hive/src/test/java/parquet/hive/TestParquetSerDe.java b/parquet-hive/src/test/java/parquet/hive/TestParquetSerDe.java
index 65d7ed6..cc08bcc 100644
--- a/parquet-hive/src/test/java/parquet/hive/TestParquetSerDe.java
+++ b/parquet-hive/src/test/java/parquet/hive/TestParquetSerDe.java
@@ -11,11 +11,8 @@
  */
 package parquet.hive;
 
-import java.util.Arrays;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.util.Properties;
+
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
@@ -53,7 +50,7 @@
       serDe.initialize(conf, tbl);
 
       // Data
-      Writable[] arr = new Writable[6];
+      final Writable[] arr = new Writable[8];
 
       arr[0] = new ByteWritable((byte) 123);
       arr[1] = new ShortWritable((short) 456);
@@ -62,6 +59,24 @@
       arr[4] = new DoubleWritable((double) 5.3);
       arr[5] = new BinaryWritable("hive and hadoop and parquet. Big family.");
 
+      final Writable[] mapContainer = new Writable[1];
+      final Writable[] map = new Writable[3];
+      for (int i = 0; i < 3; ++i) {
+        final Writable[] pair = new Writable[2];
+        pair[0] = new BinaryWritable("key_" + i);
+        pair[1] = new IntWritable(i);
+        map[i] = new ArrayWritable(Writable.class, pair);
+      }
+      mapContainer[0] = new ArrayWritable(Writable.class, map);
+      arr[6] = new ArrayWritable(Writable.class, mapContainer);
+
+      final Writable[] arrayContainer = new Writable[1];
+      final Writable[] array = new Writable[5];
+      for (int i = 0; i < 5; ++i) {
+        array[i] = new BinaryWritable("elem_" + i);
+      }
+      arrayContainer[0] = new ArrayWritable(Writable.class, array);
+      arr[7] = new ArrayWritable(Writable.class, arrayContainer);
 
       final ArrayWritable arrWritable = new ArrayWritable(Writable.class, arr);
       // Test
@@ -74,36 +89,29 @@
     }
   }
 
-  private void deserializeAndSerializeLazySimple(final ParquetHiveSerDe serDe, final ArrayWritable t)
-          throws SerDeException {
+  private void deserializeAndSerializeLazySimple(final ParquetHiveSerDe serDe, final ArrayWritable t) throws SerDeException {
 
     // Get the row structure
-    final StructObjectInspector oi = (StructObjectInspector) serDe
-            .getObjectInspector();
+    final StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector();
 
     // Deserialize
     final Object row = serDe.deserialize(t);
-    assertEquals("deserialize gave the wrong object", row.getClass(), ArrayWritable.class);
-    assertEquals("serialized size correct after deserialization", serDe.getSerDeStats()
-            .getRawDataSize(), t.get().length);
-    assertEquals("deserialisation give the wrong object", t, row);
+    assertEquals("deserialization gives the wrong object class", row.getClass(), ArrayWritable.class);
+    assertEquals("size correct after deserialization", serDe.getSerDeStats().getRawDataSize(), t.get().length);
+    assertEquals("deserialization gives the wrong object", t, row);
 
     // Serialize
     final ArrayWritable serializedArr = (ArrayWritable) serDe.serialize(row, oi);
-    assertEquals("serialized size correct after serialization", serDe.getSerDeStats()
-            .getRawDataSize(),
-            serializedArr.get().length);
-    assertTrue("serialize Object to ArrayWritable should be equals", Arrays.deepEquals(t.get(), serializedArr.get()));
+    assertEquals("size correct after serialization", serDe.getSerDeStats().getRawDataSize(), serializedArr.get().length);
+    assertTrue("serialized object should be equal to starting object", UtilitiesTestMethods.arrayWritableEquals(t, serializedArr));
   }
 
   private Properties createProperties() {
     final Properties tbl = new Properties();
 
     // Set the configuration parameters
-    tbl.setProperty("columns",
-            "abyte,ashort,aint,along,adouble,astring");
-    tbl.setProperty("columns.types",
-            "tinyint:smallint:int:bigint:double:string");
+    tbl.setProperty("columns", "abyte,ashort,aint,along,adouble,astring,amap,alist");
+    tbl.setProperty("columns.types", "tinyint:smallint:int:bigint:double:string:map<string,int>:array<string>");
     tbl.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL");
     return tbl;
   }
diff --git a/parquet-hive/src/test/java/parquet/hive/UtilitiesTestMethods.java b/parquet-hive/src/test/java/parquet/hive/UtilitiesTestMethods.java
index 9001f1e..f14f77d 100644
--- a/parquet-hive/src/test/java/parquet/hive/UtilitiesTestMethods.java
+++ b/parquet-hive/src/test/java/parquet/hive/UtilitiesTestMethods.java
@@ -3,6 +3,7 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -36,16 +37,17 @@
     w.end(new HashMap<String, String>());
   }
 
-  public static boolean smartCheckArray(Writable[] arrValue, Writable[] arrExpected, Integer[] arrCheckIndexValues) {
+  public static boolean smartCheckArray(final Writable[] arrValue, final Writable[] arrExpected, final Integer[] arrCheckIndexValues) {
 
     int i = 0;
-    for (Integer index : arrCheckIndexValues) {
+    for (final Integer index : arrCheckIndexValues) {
       if (index != Integer.MIN_VALUE) {
         final Writable value = arrValue[index];
         final Writable expectedValue = arrExpected[index];
 
         if (((value == null && expectedValue == null)
-                || (((value != null && expectedValue != null) && (value.equals(expectedValue))))) == false) {
+                || (((value != null && expectedValue != null) && (value.equals(expectedValue))))
+                || (value != null && expectedValue != null && value instanceof ArrayWritable && expectedValue instanceof ArrayWritable && arrayWritableEquals((ArrayWritable) value, (ArrayWritable) expectedValue))) == false) {
           return false;
         }
       } else {
@@ -60,9 +62,35 @@
     return true;
   }
 
-  static public ArrayWritable createArrayWritable(final Integer custkey, final String name, final String address, final Integer nationkey, final String phone, final Double acctbal, final String mktsegment, final String comment) {
+  public static boolean arrayWritableEquals(final ArrayWritable a1, final ArrayWritable a2) {
+    final Writable[] a1Arr = a1.get();
+    final Writable[] a2Arr = a2.get();
 
-    Writable[] arr = new Writable[9]; // The last one is for the unknow column
+    if (a1Arr.length != a2Arr.length) {
+      return false;
+    }
+
+    for (int i = 0; i < a1Arr.length; ++i) {
+      if (a1Arr[i] instanceof ArrayWritable) {
+        if (!(a2Arr[i] instanceof ArrayWritable)) {
+          return false;
+        }
+        if (!arrayWritableEquals((ArrayWritable) a1Arr[i], (ArrayWritable) a2Arr[i])) {
+          return false;
+        }
+      } else {
+        if (!a1Arr[i].equals(a2Arr[i])) {
+          return false;
+        }
+      }
+
+    }
+    return true;
+  }
+
+  static public ArrayWritable createArrayWritable(final Integer custkey, final String name, final String address, final Integer nationkey, final String phone, final Double acctbal, final String mktsegment, final String comment, final Map<String, String> map, final List<Integer> list) {
+
+    final Writable[] arr = new Writable[11]; // The last one is for the unknown column
     arr[0] = new IntWritable(custkey);
     if (name != null) {
       arr[1] = new BinaryWritable(name);
@@ -85,6 +113,29 @@
     if (comment != null) {
       arr[7] = new BinaryWritable(comment);
     }
+    if (map != null) {
+      final Writable[] mapContainer = new Writable[1];
+      final Writable[] mapArr = new Writable[map.size()];
+      int i = 0;
+      for (Map.Entry<String, String> entry : map.entrySet()) {
+        final Writable[] pair = new Writable[2];
+        pair[0] = new BinaryWritable(entry.getKey());
+        pair[1] = new BinaryWritable(entry.getValue());
+        mapArr[i] = new ArrayWritable(Writable.class, pair);
+        ++i;
+      }
+      mapContainer[0] = new ArrayWritable(Writable.class, mapArr);
+      arr[8] = new ArrayWritable(Writable.class, mapContainer);
+    }
+    if (list != null) {
+      final Writable[] listContainer = new Writable[1];
+      final Writable[] listArr = new Writable[list.size()];
+      for (int i = 0; i < list.size(); ++i) {
+        listArr[i] = new IntWritable(list.get(i));
+      }
+      listContainer[0] = new ArrayWritable(Writable.class, listArr);
+      arr[9] = new ArrayWritable(Writable.class, listContainer);
+    }
 
     return new ArrayWritable(Writable.class, arr);
   }
@@ -131,6 +182,27 @@
         recordWriter.addBinary(Binary.fromString((String) value));
       } else if (value instanceof Double) {
         recordWriter.addDouble((Double) value);
+      } else if (value instanceof Map) {
+        recordWriter.startGroup();
+        recordWriter.startField("map", 0);
+        for (Object entry : ((Map) value).entrySet()) {
+          recordWriter.startGroup();
+          writeField(recordWriter, 0, "key", ((Map.Entry) entry).getKey());
+          writeField(recordWriter, 1, "value", ((Map.Entry) entry).getValue());
+          recordWriter.endGroup();
+        }
+        recordWriter.endField("map", 0);
+        recordWriter.endGroup();
+      } else if (value instanceof List) {
+        recordWriter.startGroup();
+        recordWriter.startField("bag", 0);
+        for (Object element : (List) value) {
+          recordWriter.startGroup();
+          writeField(recordWriter, 0, "array_element", element);
+          recordWriter.endGroup();
+        }
+        recordWriter.endField("bag", 0);
+        recordWriter.endGroup();
       } else {
         throw new IllegalArgumentException(value.getClass().getName() + " not supported");
       }