Merge pull request #359 from mping/patch-1

Expose values in SimpleRecord
diff --git a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java
index 2b947eb..0351db8 100644
--- a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java
+++ b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java
@@ -23,10 +23,12 @@
 
 import org.junit.Test;
 
+import parquet.Log;
 import parquet.column.values.ValuesReader;
 import parquet.column.values.ValuesWriter;
 
 public class TestBitPackingColumn {
+  private static final Log LOG = Log.getLog(TestBitPackingColumn.class);
 
   @Test
   public void testZero() throws IOException {
@@ -156,15 +158,15 @@
 
   private void validateEncodeDecode(int bitLength, int[] vals, String expected) throws IOException {
     for (PACKING_TYPE type : PACKING_TYPE.values()) {
-      System.out.println(type);
+      LOG.debug(type);
       final int bound = (int)Math.pow(2, bitLength) - 1;
       ValuesWriter w = type.getWriter(bound);
       for (int i : vals) {
         w.writeInteger(i);
       }
       byte[] bytes = w.getBytes().toByteArray();
-      System.out.println("vals ("+bitLength+"): " + TestBitPacking.toString(vals));
-      System.out.println("bytes: " + TestBitPacking.toString(bytes));
+      LOG.debug("vals ("+bitLength+"): " + TestBitPacking.toString(vals));
+      LOG.debug("bytes: " + TestBitPacking.toString(bytes));
       assertEquals(type.toString(), expected, TestBitPacking.toString(bytes));
       ValuesReader r = type.getReader(bound);
       r.initFromPage(vals.length, bytes, 0);
@@ -172,7 +174,7 @@
       for (int i = 0; i < result.length; i++) {
         result[i] = r.readInteger();
       }
-      System.out.println("result: " + TestBitPacking.toString(result));
+      LOG.debug("result: " + TestBitPacking.toString(result));
       assertArrayEquals(type + " result: " + TestBitPacking.toString(result), vals, result);
     }
   }
diff --git a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestBitPacking.java b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestBitPacking.java
index d643093..7b2fde2 100644
--- a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestBitPacking.java
+++ b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestBitPacking.java
@@ -24,10 +24,12 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import parquet.Log;
 import parquet.column.values.bitpacking.BitPacking.BitPackingReader;
 import parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
 
 public class TestBitPacking {
+  private static final Log LOG = Log.getLog(TestBitPacking.class);
 
   @Test
   public void testZero() throws IOException {
@@ -164,8 +166,8 @@
     }
     w.finish();
     byte[] bytes = baos.toByteArray();
-    System.out.println("vals ("+bitLength+"): " + toString(vals));
-    System.out.println("bytes: " + toString(bytes));
+    LOG.debug("vals ("+bitLength+"): " + toString(vals));
+    LOG.debug("bytes: " + toString(bytes));
     Assert.assertEquals(expected, toString(bytes));
     ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
     BitPackingReader r = BitPacking.createBitPackingReader(bitLength, bais, vals.length);
@@ -173,7 +175,7 @@
     for (int i = 0; i < result.length; i++) {
       result[i] = r.read();
     }
-    System.out.println("result: " + toString(result));
+    LOG.debug("result: " + toString(result));
     assertArrayEquals(vals, result);
   }
 
diff --git a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java
index 72a227e..9d109f4 100644
--- a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java
+++ b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java
@@ -23,21 +23,23 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import parquet.Log;
 import parquet.column.values.bitpacking.BitPacking.BitPackingReader;
 import parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
 
 public class TestByteBitPacking {
+  private static final Log LOG = Log.getLog(TestByteBitPacking.class);
 
   @Test
   public void testPackUnPack() {
-    System.out.println();
-    System.out.println("testPackUnPack");
+    LOG.debug("");
+    LOG.debug("testPackUnPack");
     for (int i = 1; i < 32; i++) {
-      System.out.println("Width: " + i);
+      LOG.debug("Width: " + i);
       int[] unpacked = new int[32];
       int[] values = generateValues(i);
       packUnpack(Packer.BIG_ENDIAN.newBytePacker(i), values, unpacked);
-      System.out.println("Output: " + TestBitPacking.toString(unpacked));
+      LOG.debug("Output: " + TestBitPacking.toString(unpacked));
       Assert.assertArrayEquals("width "+i, values, unpacked);
     }
   }
@@ -45,7 +47,7 @@
   private void packUnpack(BytePacker packer, int[] values, int[] unpacked) {
     byte[] packed = new byte[packer.getBitWidth() * 4];
     packer.pack32Values(values, 0, packed, 0);
-    System.out.println("packed: " + TestBitPacking.toString(packed));
+    LOG.debug("packed: " + TestBitPacking.toString(packed));
     packer.unpack32Values(packed, 0, unpacked, 0);
   }
 
@@ -54,16 +56,16 @@
     for (int j = 0; j < values.length; j++) {
       values[j] = (int)(Math.random() * 100000) % (int)Math.pow(2, bitWidth);
     }
-    System.out.println("Input:  " + TestBitPacking.toString(values));
+    LOG.debug("Input:  " + TestBitPacking.toString(values));
     return values;
   }
 
   @Test
   public void testPackUnPackAgainstHandWritten() throws IOException {
-    System.out.println();
-    System.out.println("testPackUnPackAgainstHandWritten");
+    LOG.debug("");
+    LOG.debug("testPackUnPackAgainstHandWritten");
     for (int i = 1; i < 8; i++) {
-      System.out.println("Width: " + i);
+      LOG.debug("Width: " + i);
       byte[] packed = new byte[i * 4];
       int[] unpacked = new int[32];
       int[] values = generateValues(i);
@@ -72,7 +74,7 @@
       final BytePacker packer = Packer.BIG_ENDIAN.newBytePacker(i);
       packer.pack32Values(values, 0, packed, 0);
 
-      System.out.println("Generated: " + TestBitPacking.toString(packed));
+      LOG.debug("Generated: " + TestBitPacking.toString(packed));
 
       // pack manual
       final ByteArrayOutputStream manualOut = new ByteArrayOutputStream();
@@ -81,7 +83,7 @@
         writer.write(values[j]);
       }
       final byte[] packedManualAsBytes = manualOut.toByteArray();
-      System.out.println("Manual: " + TestBitPacking.toString(packedManualAsBytes));
+      LOG.debug("Manual: " + TestBitPacking.toString(packedManualAsBytes));
 
       // unpack manual
       final BitPackingReader reader = BitPacking.createBitPackingReader(i, new ByteArrayInputStream(packed), 32);
@@ -89,7 +91,7 @@
         unpacked[j] = reader.read();
       }
 
-      System.out.println("Output: " + TestBitPacking.toString(unpacked));
+      LOG.debug("Output: " + TestBitPacking.toString(unpacked));
       Assert.assertArrayEquals("width " + i, values, unpacked);
     }
   }
@@ -97,10 +99,10 @@
   @Test
   public void testPackUnPackAgainstLemire() throws IOException {
     for (Packer pack: Packer.values()) {
-      System.out.println();
-      System.out.println("testPackUnPackAgainstLemire " + pack.name());
+      LOG.debug("");
+      LOG.debug("testPackUnPackAgainstLemire " + pack.name());
       for (int i = 1; i < 32; i++) {
-        System.out.println("Width: " + i);
+        LOG.debug("Width: " + i);
         int[] packed = new int[i];
         int[] unpacked = new int[32];
         int[] values = generateValues(i);
@@ -127,17 +129,17 @@
           }
         }
         final byte[] packedByLemireAsBytes = lemireOut.toByteArray();
-        System.out.println("Lemire out: " + TestBitPacking.toString(packedByLemireAsBytes));
+        LOG.debug("Lemire out: " + TestBitPacking.toString(packedByLemireAsBytes));
 
         // pack manual
         final BytePacker bytePacker = pack.newBytePacker(i);
         byte[] packedGenerated = new byte[i * 4];
         bytePacker.pack32Values(values, 0, packedGenerated, 0);
-        System.out.println("Gener. out: " + TestBitPacking.toString(packedGenerated));
+        LOG.debug("Gener. out: " + TestBitPacking.toString(packedGenerated));
         Assert.assertEquals(pack.name() + " width " + i, TestBitPacking.toString(packedByLemireAsBytes), TestBitPacking.toString(packedGenerated));
 
         bytePacker.unpack32Values(packedByLemireAsBytes, 0, unpacked, 0);
-        System.out.println("Output: " + TestBitPacking.toString(unpacked));
+        LOG.debug("Output: " + TestBitPacking.toString(unpacked));
 
         Assert.assertArrayEquals("width " + i, values, unpacked);
       }
diff --git a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java
index 8b41733..65fad49 100644
--- a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java
+++ b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java
@@ -22,28 +22,30 @@
 import org.junit.Assert;
 import org.junit.Test;
 
+import parquet.Log;
 import parquet.column.values.bitpacking.BitPacking.BitPackingReader;
 import parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
 
 public class TestLemireBitPacking {
+  private static final Log LOG = Log.getLog(TestLemireBitPacking.class);
 
   @Test
   public void testPackUnPack() {
     for (Packer packer : Packer.values()) {
-      System.out.println();
-      System.out.println("testPackUnPack");
+      LOG.debug("");
+      LOG.debug("testPackUnPack");
       for (int i = 1; i < 32; i++) {
-        System.out.println("Width: " + i);
+        LOG.debug("Width: " + i);
         int[] values = generateValues(i);
         int[] unpacked = new int[32];
         {
           packUnpack(packer.newIntPacker(i), values, unpacked);
-          System.out.println("int based Output " + packer.name() + ": " + TestBitPacking.toString(unpacked));
+          LOG.debug("int based Output " + packer.name() + ": " + TestBitPacking.toString(unpacked));
           Assert.assertArrayEquals(packer.name() + " width "+i, values, unpacked);
         }
         {
           packUnpack(packer.newBytePacker(i), values, unpacked);
-          System.out.println("byte based Output " + packer.name() + ": " + TestBitPacking.toString(unpacked));
+          LOG.debug("byte based Output " + packer.name() + ": " + TestBitPacking.toString(unpacked));
           Assert.assertArrayEquals(packer.name() + " width "+i, values, unpacked);
         }
       }
@@ -67,16 +69,16 @@
     for (int j = 0; j < values.length; j++) {
       values[j] = (int)(Math.random() * 100000) % (int)Math.pow(2, bitWidth);
     }
-    System.out.println("Input:  " + TestBitPacking.toString(values));
+    LOG.debug("Input:  " + TestBitPacking.toString(values));
     return values;
   }
 
   @Test
   public void testPackUnPackAgainstHandWritten() throws IOException {
-    System.out.println();
-    System.out.println("testPackUnPackAgainstHandWritten");
+    LOG.debug("");
+    LOG.debug("testPackUnPackAgainstHandWritten");
     for (int i = 1; i < 8; i++) {
-      System.out.println("Width: " + i);
+      LOG.debug("Width: " + i);
       int[] packed = new int[i];
       int[] unpacked = new int[32];
       int[] values = generateValues(i);
@@ -93,7 +95,7 @@
         lemireOut.write((v >>>  0) & 0xFF);
       }
       final byte[] packedByLemireAsBytes = lemireOut.toByteArray();
-      System.out.println("Lemire: " + TestBitPacking.toString(packedByLemireAsBytes));
+      LOG.debug("Lemire: " + TestBitPacking.toString(packedByLemireAsBytes));
 
       // pack manual
       final ByteArrayOutputStream manualOut = new ByteArrayOutputStream();
@@ -102,7 +104,7 @@
         writer.write(values[j]);
       }
       final byte[] packedManualAsBytes = manualOut.toByteArray();
-      System.out.println("Manual: " + TestBitPacking.toString(packedManualAsBytes));
+      LOG.debug("Manual: " + TestBitPacking.toString(packedManualAsBytes));
 
       // unpack manual
       final BitPackingReader reader = BitPacking.createBitPackingReader(i, new ByteArrayInputStream(packedByLemireAsBytes), 32);
@@ -110,7 +112,7 @@
         unpacked[j] = reader.read();
       }
 
-      System.out.println("Output: " + TestBitPacking.toString(unpacked));
+      LOG.debug("Output: " + TestBitPacking.toString(unpacked));
       Assert.assertArrayEquals("width " + i, values, unpacked);
     }
   }
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
index 58d2a8d..b7582ee 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
@@ -346,7 +346,7 @@
       BenchmarkCounter.incrementTotalBytes(mc.getTotalSize());
       ColumnDescriptor columnDescriptor = paths.get(pathKey);
       if (columnDescriptor != null) {
-        long startingPos = getStartingPos(mc);
+        long startingPos = mc.getStartingPos();
         // first chunk or not consecutive => new list
         if (currentChunks == null || currentChunks.endPos() != startingPos) {
           currentChunks = new ConsecutiveChunkList(startingPos);
@@ -366,18 +366,7 @@
     return columnChunkPageReadStore;
   }
 
-  /**
-   * @param mc the metadata for that chunk
-   * @return the offset of the first byte in the chunk
-   */
-  private long getStartingPos(ColumnChunkMetaData mc) {
-    long startingPos = mc.getFirstDataPageOffset();
-    if (mc.getDictionaryPageOffset() > 0 && mc.getDictionaryPageOffset() < startingPos) {
-      // if there's a dictionary and it's before the first data page, start from there
-      startingPos = mc.getDictionaryPageOffset();
-    }
-    return startingPos;
-  }
+
 
   @Override
   public void close() throws IOException {
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
index e45ba06..15133b5 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
@@ -121,13 +121,17 @@
     },
     ENDED;
 
-    STATE start() {throw new IllegalStateException(this.name());}
-    STATE startBlock() {throw new IllegalStateException(this.name());}
-    STATE startColumn() {throw new IllegalStateException(this.name());}
-    STATE write() {throw new IllegalStateException(this.name());}
-    STATE endColumn() {throw new IllegalStateException(this.name());}
-    STATE endBlock() {throw new IllegalStateException(this.name());}
-    STATE end() {throw new IllegalStateException(this.name());}
+    STATE start() throws IOException { return error(); }
+    STATE startBlock() throws IOException { return error(); }
+    STATE startColumn() throws IOException { return error(); }
+    STATE write() throws IOException { return error(); }
+    STATE endColumn() throws IOException { return error(); }
+    STATE endBlock() throws IOException { return error(); }
+    STATE end() throws IOException { return error(); }
+
+    private final STATE error() throws IOException {
+      throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
+    }
   }
 
   private STATE state = STATE.NOT_STARTED;
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
index fcb4986..3abb38b 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetInputFormat.java
@@ -48,6 +48,7 @@
 import parquet.hadoop.metadata.ParquetMetadata;
 import parquet.hadoop.util.ConfigurationUtil;
 import parquet.hadoop.util.ContextUtil;
+import parquet.io.ParquetDecodingException;
 import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 
@@ -153,88 +154,185 @@
     }
   }
 
+  //Wrapper of hdfs blocks, keep track of which HDFS block is being used
+  private static class HDFSBlocks {
+    BlockLocation[] hdfsBlocks;
+    int currentStartHdfsBlockIndex = 0;//the hdfs block index corresponding to the start of a row group
+    int currentMidPointHDFSBlockIndex = 0;// the hdfs block index corresponding to the mid-point of a row group, a split might be created only when the midpoint of the rowgroup enters a new hdfs block
+
+    private HDFSBlocks(BlockLocation[] hdfsBlocks) {
+      this.hdfsBlocks = hdfsBlocks;
+      Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
+        @Override
+        public int compare(BlockLocation b1, BlockLocation b2) {
+          return Long.signum(b1.getOffset() - b2.getOffset());
+        }
+      };
+      Arrays.sort(hdfsBlocks, comparator);
+    }
+
+    private long getHDFSBlockEndingPosition(int hdfsBlockIndex) {
+      BlockLocation hdfsBlock = hdfsBlocks[hdfsBlockIndex];
+      return hdfsBlock.getOffset() + hdfsBlock.getLength() - 1;
+    }
+
+    /**
+     * @param rowGroupMetadata
+     * @return true if the mid point of row group is in a new hdfs block, and also move the currentHDFSBlock pointer to the correct index that contains the row group;
+     * return false if the mid point of row group is in the same hdfs block
+     */
+    private boolean checkBelongingToANewHDFSBlock(BlockMetaData rowGroupMetadata) {
+      boolean isNewHdfsBlock = false;
+      long rowGroupMidPoint = rowGroupMetadata.getStartingPos() + (rowGroupMetadata.getCompressedSize() / 2);
+
+      //if mid point is not in the current HDFS block any more, return true
+      while (rowGroupMidPoint > getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex)) {
+        isNewHdfsBlock = true;
+        currentMidPointHDFSBlockIndex++;
+        if (currentMidPointHDFSBlockIndex >= hdfsBlocks.length)
+          throw new ParquetDecodingException("the row group is not in hdfs blocks in the file: midpoint of row groups is "
+                  + rowGroupMidPoint
+                  + ", the end of the hdfs block is "
+                  + getHDFSBlockEndingPosition(currentMidPointHDFSBlockIndex - 1));
+      }
+
+      while (rowGroupMetadata.getStartingPos() > getHDFSBlockEndingPosition(currentStartHdfsBlockIndex)) {
+        currentStartHdfsBlockIndex++;
+        if (currentStartHdfsBlockIndex >= hdfsBlocks.length)
+          throw new ParquetDecodingException("The row group does not start in this file: row group offset is "
+                  + rowGroupMetadata.getStartingPos()
+                  + " but the end of hdfs blocks of file is "
+                  + getHDFSBlockEndingPosition(currentStartHdfsBlockIndex));
+      }
+      return isNewHdfsBlock;
+    }
+
+    public BlockLocation get(int hdfsBlockIndex) {
+      return hdfsBlocks[hdfsBlockIndex];
+    }
+
+    public BlockLocation getCurrentBlock() {
+      return hdfsBlocks[currentStartHdfsBlockIndex];
+    }
+  }
+
+  private static class SplitInfo {
+    List<BlockMetaData> rowGroups = new ArrayList<BlockMetaData>();
+    BlockLocation hdfsBlock;
+    long compressedByteSize = 0L;
+
+    public SplitInfo(BlockLocation currentBlock) {
+      this.hdfsBlock = currentBlock;
+    }
+
+    private void addRowGroup(BlockMetaData rowGroup) {
+      this.rowGroups.add(rowGroup);
+      this.compressedByteSize += rowGroup.getCompressedSize();
+    }
+
+    public long getCompressedByteSize() {
+      return compressedByteSize;
+    }
+
+    public List<BlockMetaData> getRowGroups() {
+      return rowGroups;
+    }
+
+    int getRowGroupCount() {
+      return rowGroups.size();
+    }
+
+    public ParquetInputSplit getParquetInputSplit(FileStatus fileStatus, FileMetaData fileMetaData, String requestedSchema, Map<String, String> readSupportMetadata, String fileSchema) throws IOException {
+      MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
+      long length = 0;
+
+      for (BlockMetaData block : this.getRowGroups()) {
+        List<ColumnChunkMetaData> columns = block.getColumns();
+        for (ColumnChunkMetaData column : columns) {
+          if (requested.containsPath(column.getPath().toArray())) {
+            length += column.getTotalSize();
+          }
+        }
+      }
+      return new ParquetInputSplit(
+              fileStatus.getPath(),
+              hdfsBlock.getOffset(),
+              length,
+              hdfsBlock.getHosts(),
+              this.getRowGroups(),
+              requestedSchema,
+              fileSchema,
+              fileMetaData.getKeyValueMetaData(),
+              readSupportMetadata
+      );
+    }
+  }
+
   /**
    * groups together all the data blocks for the same HDFS block
-   * @param blocks data blocks (row groups)
-   * @param hdfsBlocks hdfs blocks
-   * @param fileStatus the containing file
-   * @param fileMetaData file level meta data
-   * @param readSupportClass the class used to materialize records
-   * @param requestedSchema the schema requested by the user
+   *
+   * @param rowGroupBlocks      data blocks (row groups)
+   * @param hdfsBlocksArray     hdfs blocks
+   * @param fileStatus          the containing file
+   * @param fileMetaData        file level meta data
+   * @param requestedSchema     the schema requested by the user
    * @param readSupportMetadata the metadata provided by the readSupport implementation in init
+   * @param minSplitSize        the mapred.min.split.size
+   * @param maxSplitSize        the mapred.max.split.size
    * @return the splits (one per HDFS block)
    * @throws IOException If hosts can't be retrieved for the HDFS block
    */
   static <T> List<ParquetInputSplit> generateSplits(
-      List<BlockMetaData> blocks,
-      BlockLocation[] hdfsBlocks,
-      FileStatus fileStatus,
-      FileMetaData fileMetaData,
-      Class<?> readSupportClass,
-      String requestedSchema,
-      Map<String, String> readSupportMetadata) throws IOException {
+          List<BlockMetaData> rowGroupBlocks,
+          BlockLocation[] hdfsBlocksArray,
+          FileStatus fileStatus,
+          FileMetaData fileMetaData,
+          String requestedSchema,
+          Map<String, String> readSupportMetadata, long minSplitSize, long maxSplitSize) throws IOException {
+    if (maxSplitSize < minSplitSize || maxSplitSize < 0 || minSplitSize < 0) {
+      throw new ParquetDecodingException("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = " + maxSplitSize + "; minSplitSize is " + minSplitSize);
+    }
     String fileSchema = fileMetaData.getSchema().toString().intern();
-    Comparator<BlockLocation> comparator = new Comparator<BlockLocation>() {
-      @Override
-      public int compare(BlockLocation b1, BlockLocation b2) {
-        return Long.signum(b1.getOffset() - b2.getOffset());
+    HDFSBlocks hdfsBlocks = new HDFSBlocks(hdfsBlocksArray);
+    hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupBlocks.get(0));
+    SplitInfo currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
+
+    //assign rowGroups to splits
+    List<SplitInfo> splitRowGroups = new ArrayList<SplitInfo>();
+    checkSorted(rowGroupBlocks);//assert row groups are sorted
+    for (BlockMetaData rowGroupMetadata : rowGroupBlocks) {
+      if ((hdfsBlocks.checkBelongingToANewHDFSBlock(rowGroupMetadata)
+             && currentSplit.getCompressedByteSize() >= minSplitSize
+             && currentSplit.getCompressedByteSize() > 0)
+           || currentSplit.getCompressedByteSize() >= maxSplitSize) {
+        //create a new split
+        splitRowGroups.add(currentSplit);//finish previous split
+        currentSplit = new SplitInfo(hdfsBlocks.getCurrentBlock());
       }
-    };
-    Arrays.sort(hdfsBlocks, comparator);
-    List<List<BlockMetaData>> splitGroups = new ArrayList<List<BlockMetaData>>(hdfsBlocks.length);
-    for (int i = 0; i < hdfsBlocks.length; i++) {
-      splitGroups.add(new ArrayList<BlockMetaData>());
+      currentSplit.addRowGroup(rowGroupMetadata);
     }
-    for (BlockMetaData block : blocks) {
-      final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
-      int index = Arrays.binarySearch(hdfsBlocks, new BlockLocation() {@Override
-        public long getOffset() {
-        return firstDataPage;
-      }}, comparator);
-      if (index >= 0) {
-        splitGroups.get(index).add(block);
-      } else {
-        int insertionPoint = - index - 1;
-        if (insertionPoint == 0) {
-          // really, there should always be a block in 0
-          LOG.warn("row group before the first HDFS block:  " + block);
-          splitGroups.get(0).add(block);
-        } else {
-          splitGroups.get(insertionPoint - 1).add(block);
-        }
+
+    if (currentSplit.getRowGroupCount() > 0) {
+      splitRowGroups.add(currentSplit);
+    }
+
+    //generate splits from rowGroups of each split
+    List<ParquetInputSplit> resultSplits = new ArrayList<ParquetInputSplit>();
+    for (SplitInfo splitInfo : splitRowGroups) {
+      ParquetInputSplit split = splitInfo.getParquetInputSplit(fileStatus, fileMetaData, requestedSchema, readSupportMetadata, fileSchema);
+      resultSplits.add(split);
+    }
+    return resultSplits;
+  }
+
+  private static void checkSorted(List<BlockMetaData> rowGroupBlocks) {
+    long previousOffset = 0L;
+    for(BlockMetaData rowGroup: rowGroupBlocks) {
+      long currentOffset = rowGroup.getStartingPos();
+      if (currentOffset < previousOffset) {
+        throw new ParquetDecodingException("row groups are not sorted: previous row groups starts at " + previousOffset + ", current row group starts at " + currentOffset);
       }
     }
-    List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
-    for (int i = 0; i < hdfsBlocks.length; i++) {
-      BlockLocation hdfsBlock = hdfsBlocks[i];
-      List<BlockMetaData> blocksForCurrentSplit = splitGroups.get(i);
-      if (blocksForCurrentSplit.size() == 0) {
-        LOG.debug("HDFS block without row group: " + hdfsBlocks[i]);
-      } else {
-        long length = 0;
-        for (BlockMetaData block : blocksForCurrentSplit) {
-          MessageType requested = MessageTypeParser.parseMessageType(requestedSchema);
-          List<ColumnChunkMetaData> columns = block.getColumns();
-          for (ColumnChunkMetaData column : columns) {
-            if (requested.containsPath(column.getPath().toArray())) {
-              length += column.getTotalSize();
-            }
-          }
-        }
-        splits.add(new ParquetInputSplit(
-          fileStatus.getPath(),
-          hdfsBlock.getOffset(),
-          length,
-          hdfsBlock.getHosts(),
-          blocksForCurrentSplit,
-          requestedSchema,
-          fileSchema,
-          fileMetaData.getKeyValueMetaData(),
-          readSupportMetadata
-          ));
-      }
-    }
-    return splits;
   }
 
   /**
@@ -254,6 +352,11 @@
    * @throws IOException
    */
   public List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> footers) throws IOException {
+    final long maxSplitSize = configuration.getLong("mapred.max.split.size", Long.MAX_VALUE);
+    final long minSplitSize = Math.max(getFormatMinSplitSize(), configuration.getLong("mapred.min.split.size", 0L));
+    if (maxSplitSize < 0 || minSplitSize < 0) {
+      throw new ParquetDecodingException("maxSplitSize or minSplitSie should not be negative: maxSplitSize = " + maxSplitSize + "; minSplitSize = " + minSplitSize);
+    }
     List<ParquetInputSplit> splits = new ArrayList<ParquetInputSplit>();
     GlobalMetaData globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
     ReadContext readContext = getReadSupport(configuration).init(new InitContext(
@@ -274,9 +377,10 @@
               fileBlockLocations,
               fileStatus,
               parquetMetaData.getFileMetaData(),
-              readSupportClass,
               readContext.getRequestedSchema().toString(),
-              readContext.getReadSupportMetadata())
+              readContext.getReadSupportMetadata(),
+              minSplitSize,
+              maxSplitSize)
           );
     }
     return splits;
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/BlockMetaData.java
index 27c25ef..58659bc 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/BlockMetaData.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/BlockMetaData.java
@@ -95,9 +95,26 @@
     return Collections.unmodifiableList(columns);
   }
 
+  /**
+   *
+   * @return the starting pos of first column
+   */
+  public long getStartingPos() {
+    return getColumns().get(0).getStartingPos();
+  }
   @Override
   public String toString() {
     return "BlockMetaData{" + rowCount + ", " + totalByteSize + " " + columns + "}";
   }
 
+  /**
+   * @return the compressed size of all columns
+   */
+  public long getCompressedSize() {
+    long totalSize = 0;
+    for (ColumnChunkMetaData col : getColumns()) {
+      totalSize += col.getTotalSize();
+    }
+    return totalSize;
+  }
 }
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 4939e47..3631fee 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -58,6 +58,19 @@
   }
 
   /**
+   * @return the offset of the first byte in the chunk
+   */
+  public long getStartingPos() {
+    long dictionaryPageOffset = getDictionaryPageOffset();
+    long firstDataPageOffset = getFirstDataPageOffset();
+    if (dictionaryPageOffset > 0 && dictionaryPageOffset < firstDataPageOffset) {
+      // if there's a dictionary and it's before the first data page, start from there
+      return dictionaryPageOffset;
+    }
+    return firstDataPageOffset;
+  }
+
+  /**
    * checks that a positive long value fits in an int.
    * (reindexed on Integer.MIN_VALUE)
    * @param value
@@ -224,7 +237,7 @@
 }
 class LongColumnChunkMetaData extends ColumnChunkMetaData {
 
-  private final long firstDataPage;
+  private final long firstDataPageOffset;
   private final long dictionaryPageOffset;
   private final long valueCount;
   private final long totalSize;
@@ -235,7 +248,7 @@
    * @param type type of the column
    * @param codec
    * @param encodings
-   * @param firstDataPage
+   * @param firstDataPageOffset
    * @param dictionaryPageOffset
    * @param valueCount
    * @param totalSize
@@ -243,13 +256,13 @@
    */
   LongColumnChunkMetaData(
       ColumnPath path, PrimitiveTypeName type, CompressionCodecName codec, Set<Encoding> encodings,
-      long firstDataPage,
+      long firstDataPageOffset,
       long dictionaryPageOffset,
       long valueCount,
       long totalSize,
       long totalUncompressedSize) {
     super(ColumnChunkProperties.get(path, type, codec, encodings));
-    this.firstDataPage = firstDataPage;
+    this.firstDataPageOffset = firstDataPageOffset;
     this.dictionaryPageOffset = dictionaryPageOffset;
     this.valueCount = valueCount;
     this.totalSize = totalSize;
@@ -260,7 +273,7 @@
    * @return start of the column data offset
    */
   public long getFirstDataPageOffset() {
-    return firstDataPage;
+    return firstDataPageOffset;
   }
 
   /**
diff --git a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
index af9ce45..ee19fed 100644
--- a/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
+++ b/parquet-hadoop/src/test/java/parquet/hadoop/TestInputFormat.java
@@ -15,66 +15,280 @@
  */
 package parquet.hadoop;
 
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+import org.junit.Before;
 import org.junit.Test;
-
 import parquet.column.Encoding;
-import parquet.hadoop.api.ReadSupport;
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ColumnPath;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.metadata.FileMetaData;
+import parquet.io.ParquetDecodingException;
 import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
 import parquet.schema.PrimitiveType.PrimitiveTypeName;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 public class TestInputFormat {
 
-  @Test
-  public void testBlocksToSplits() throws IOException, InterruptedException {
-    List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+  List<BlockMetaData> blocks;
+  BlockLocation[] hdfsBlocks;
+  FileStatus fileStatus;
+  MessageType schema;
+  FileMetaData fileMetaData;
+
+  /*
+    The test File contains 2-3 hdfs blocks based on the setting of each test, when hdfsBlock size is set to 50: [0-49][50-99]
+    each row group is of size 10, so the rowGroups layout on hdfs is like:
+    xxxxx xxxxx
+    each x is a row group, each groups of x's is a hdfsBlock
+   */
+  @Before
+  public void setUp() {
+    blocks = new ArrayList<BlockMetaData>();
     for (int i = 0; i < 10; i++) {
-      blocks.add(newBlock(i * 10));
+      blocks.add(newBlock(i * 10, 10));
     }
-    BlockLocation[] hdfsBlocks = new BlockLocation[] {
-        new BlockLocation(new String[0], new String[] { "foo0.datanode", "bar0.datanode"}, 0, 50),
-        new BlockLocation(new String[0], new String[] { "foo1.datanode", "bar1.datanode"}, 50, 50)
-    };
-    FileStatus fileStatus = new FileStatus(100, false, 2, 50, 0, new Path("hdfs://foo.namenode:1234/bar"));
-    MessageType schema = MessageTypeParser.parseMessageType("message doc { required binary foo; }");
-    FileMetaData fileMetaData = new FileMetaData(schema, new HashMap<String, String>(), "parquet-mr");
-    @SuppressWarnings("serial")
-    List<ParquetInputSplit> splits = ParquetInputFormat.generateSplits(
-        blocks, hdfsBlocks, fileStatus, fileMetaData, ReadSupport.class, schema.toString(), new HashMap<String, String>() {{put("specific", "foo");}});
-    assertEquals(splits.toString().replaceAll("([{])", "$0\n").replaceAll("([}])", "\n$0"), 2, splits.size());
-    for (int i = 0; i < splits.size(); i++) {
-      ParquetInputSplit parquetInputSplit = splits.get(i);
-      assertEquals(5, parquetInputSplit.getBlocks().size());
-      assertEquals(2, parquetInputSplit.getLocations().length);
-      assertEquals("[foo" + i + ".datanode, bar" + i + ".datanode]", Arrays.toString(parquetInputSplit.getLocations()));
-      assertEquals(10, parquetInputSplit.getLength());
-      assertEquals("foo", parquetInputSplit.getReadSupportMetadata().get("specific"));
+    fileStatus = new FileStatus(100, false, 2, 50, 0, new Path("hdfs://foo.namenode:1234/bar"));
+    schema = MessageTypeParser.parseMessageType("message doc { required binary foo; }");
+    fileMetaData = new FileMetaData(schema, new HashMap<String, String>(), "parquet-mr");
+  }
+
+  @Test
+  public void testThrowExceptionWhenMaxSplitSizeIsSmallerThanMinSplitSize() throws IOException {
+    try {
+      generateSplitByMinMaxSize(50, 49);
+      fail("should throw exception when max split size is smaller than the min split size");
+    } catch (ParquetDecodingException e) {
+      assertEquals("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = 49; minSplitSize is 50"
+              , e.getMessage());
     }
   }
 
-  private BlockMetaData newBlock(long start) {
+  @Test
+  public void testThrowExceptionWhenMaxSplitSizeIsNegative() throws IOException {
+    try {
+      generateSplitByMinMaxSize(-100, -50);
+      fail("should throw exception when max split size is negative");
+    } catch (ParquetDecodingException e) {
+      assertEquals("maxSplitSize and minSplitSize should be positive and max should be greater or equal to the minSplitSize: maxSplitSize = -50; minSplitSize is -100"
+              , e.getMessage());
+    }
+  }
+
+  /*
+    aaaaa bbbbb
+   */
+  @Test
+  public void testGenerateSplitsAlignedWithHDFSBlock() throws IOException {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(50, 50);
+    shouldSplitBlockSizeBe(splits, 5, 5);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 50, 50);
+
+    splits = generateSplitByMinMaxSize(0, Long.MAX_VALUE);
+    shouldSplitBlockSizeBe(splits, 5, 5);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 50, 50);
+
+  }
+
+  @Test
+  public void testRowGroupNotAlignToHDFSBlock() throws IOException {
+    //Test HDFS blocks size(51) is not multiple of row group size(10)
+    withHDFSBlockSize(51, 51);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(50, 50);
+    shouldSplitBlockSizeBe(splits, 5, 5);
+    shouldSplitLocationBe(splits, 0, 0);//for the second split, the first byte will still be in the first hdfs block, therefore the locations are both 0
+    shouldSplitLengthBe(splits, 50, 50);
+
+    //Test a rowgroup is greater than the hdfsBlock boundary
+    withHDFSBlockSize(49, 49);
+    splits = generateSplitByMinMaxSize(50, 50);
+    shouldSplitBlockSizeBe(splits, 5, 5);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 50, 50);
+
+    /*
+    aaaa bbbbb c
+    for the 5th row group, the midpoint is 45, but the end of first hdfsBlock is 44, therefore a new split(b) will be created
+    for 9th group, the mid point is 85, the end of second block is 88, so it's considered mainly in the 2nd hdfs block, and therefore inserted as
+    a row group of split b
+     */
+    withHDFSBlockSize(44,44,44);
+    splits = generateSplitByMinMaxSize(40, 50);
+    shouldSplitBlockSizeBe(splits, 4, 5, 1);
+    shouldSplitLocationBe(splits, 0, 0, 2);
+    shouldSplitLengthBe(splits, 40, 50, 10);
+  }
+
+  /*
+    when min size is 55, max size is 56, the first split will be generated with 6 row groups(size of 10 each), which satisfies split.size>min.size, but not split.size<max.size
+    aaaaa abbbb
+   */
+  @Test
+  public void testGenerateSplitsNotAlignedWithHDFSBlock() throws IOException, InterruptedException {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(55, 56);
+    shouldSplitBlockSizeBe(splits, 6, 4);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 60, 40);
+
+    withHDFSBlockSize(51, 51);
+    splits = generateSplitByMinMaxSize(55, 56);
+    shouldSplitBlockSizeBe(splits, 6, 4);
+    shouldSplitLocationBe(splits, 0, 1);//since a whole row group of split a is added to the second hdfs block, so the location of split b is still 1
+    shouldSplitLengthBe(splits, 60, 40);
+
+    withHDFSBlockSize(49, 49, 49);
+    splits = generateSplitByMinMaxSize(55, 56);
+    shouldSplitBlockSizeBe(splits, 6, 4);
+    shouldSplitLocationBe(splits, 0, 1);
+    shouldSplitLengthBe(splits, 60, 40);
+
+  }
+
+  /*
+    when the max size is set to be 30, first split will be of size 30,
+    and when creating second split, it will try to align it to second hdfsBlock, and therefore generates a split of size 20
+    aaabb cccdd
+   */
+  @Test
+  public void testGenerateSplitsSmallerThanMaxSizeAndAlignToHDFS() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(18, 30);
+    shouldSplitBlockSizeBe(splits, 3, 2, 3, 2);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1);
+    shouldSplitLengthBe(splits, 30, 20, 30, 20);
+
+    /*
+    aaabb cccdd
+         */
+    withHDFSBlockSize(51, 51);
+    splits = generateSplitByMinMaxSize(18, 30);
+    shouldSplitBlockSizeBe(splits, 3, 2, 3, 2);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1);//the first byte of split c is in the first hdfs block
+    shouldSplitLengthBe(splits, 30, 20, 30, 20);
+
+    /*
+    aaabb cccdd
+     */
+    withHDFSBlockSize(49, 49, 49);
+    splits = generateSplitByMinMaxSize(18, 30);
+    shouldSplitBlockSizeBe(splits, 3, 2, 3, 2);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1);
+    shouldSplitLengthBe(splits, 30, 20, 30, 20);
+  }
+
+  /*
+    when the min size is set to be 25, so the second split can not be aligned with the boundary of hdfs block, there for split of size 30 will be created as the 3rd split.
+    aaabb bcccd
+   */
+  @Test
+  public void testGenerateSplitsCrossHDFSBlockBoundaryToSatisfyMinSize() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(25, 30);
+    shouldSplitBlockSizeBe(splits, 3, 3, 3, 1);
+    shouldSplitLocationBe(splits, 0, 0, 1, 1);
+    shouldSplitLengthBe(splits, 30, 30, 30, 10);
+  }
+
+  /*
+    when rowGroups size is 10, but min split size is 10, max split size is 18, it will create splits of size 20 and of size 10 and align with hdfsBlocks
+    aabbc ddeef
+   */
+  @Test
+  public void testMultipleRowGroupsInABlockToAlignHDFSBlock() throws Exception {
+    withHDFSBlockSize(50, 50);
+    List<ParquetInputSplit> splits = generateSplitByMinMaxSize(10, 18);
+    shouldSplitBlockSizeBe(splits, 2, 2, 1, 2, 2, 1);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 20, 20, 10, 20, 20, 10);
+
+    /*
+    aabbc ddeef
+    notice the first byte of split d is in the first hdfs block:
+    when adding the 6th row group, although the first byte of it is in the first hdfs block
+    , but the mid point of the row group is in the second hdfs block, there for a new split(d) is created including that row group
+     */
+    withHDFSBlockSize(51, 51);
+    splits = generateSplitByMinMaxSize(10, 18);
+    shouldSplitBlockSizeBe(splits, 2, 2, 1, 2, 2, 1);
+    shouldSplitLocationBe(splits, 0, 0, 0, 0, 1, 1);// location of split d should be 0, since the first byte is in the first hdfs block
+    shouldSplitLengthBe(splits, 20, 20, 10, 20, 20, 10);
+
+    /*
+    aabbc ddeef
+    same as the case where block sizes are 50 50
+     */
+    withHDFSBlockSize(49, 49);
+    splits = generateSplitByMinMaxSize(10, 18);
+    shouldSplitBlockSizeBe(splits, 2, 2, 1, 2, 2, 1);
+    shouldSplitLocationBe(splits, 0, 0, 0, 1, 1, 1);
+    shouldSplitLengthBe(splits, 20, 20, 10, 20, 20, 10);
+  }
+
+  private List<ParquetInputSplit> generateSplitByMinMaxSize(long min, long max) throws IOException {
+    return ParquetInputFormat.generateSplits(
+            blocks, hdfsBlocks, fileStatus, fileMetaData, schema.toString(), new HashMap<String, String>() {{
+              put("specific", "foo");
+            }}, min, max
+    );
+  }
+
+  private void shouldSplitBlockSizeBe(List<ParquetInputSplit> splits, int... sizes) {
+    assertEquals(sizes.length, splits.size());
+    for (int i = 0; i < sizes.length; i++) {
+      assertEquals(sizes[i], splits.get(i).getBlocks().size());
+      assertEquals("foo", splits.get(i).getReadSupportMetadata().get("specific"));
+    }
+  }
+
+  private void shouldSplitLocationBe(List<ParquetInputSplit> splits, int... locations) throws IOException {
+    assertEquals(locations.length, splits.size());
+    for (int i = 0; i < locations.length; i++) {
+      assertEquals("[foo" + locations[i] + ".datanode, bar" + locations[i] + ".datanode]", Arrays.toString(splits.get(i).getLocations()));
+    }
+  }
+
+  private void shouldSplitLengthBe(List<ParquetInputSplit> splits, int... lengths) {
+    assertEquals(lengths.length, splits.size());
+    for (int i = 0; i < lengths.length; i++) {
+      assertEquals(lengths[i], splits.get(i).getLength());
+    }
+  }
+
+  private void withHDFSBlockSize(long... blockSizes) {
+    hdfsBlocks = new BlockLocation[blockSizes.length];
+    long offset = 0;
+    for (int i = 0; i < blockSizes.length; i++) {
+      long blockSize = blockSizes[i];
+      hdfsBlocks[i] = new BlockLocation(new String[0], new String[]{"foo" + i + ".datanode", "bar" + i + ".datanode"}, offset, blockSize);
+      offset += blockSize;
+    }
+  }
+
+  private BlockMetaData newBlock(long start, long compressedBlockSize) {
     BlockMetaData blockMetaData = new BlockMetaData();
+    long uncompressedSize = compressedBlockSize * 2;//assuming the compression ratio is 2
     ColumnChunkMetaData column = ColumnChunkMetaData.get(
-        ColumnPath.get("foo"), PrimitiveTypeName.BINARY, CompressionCodecName.GZIP, new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
-        start, 0l, 0l, 2l, 0l);
+            ColumnPath.get("foo"), PrimitiveTypeName.BINARY, CompressionCodecName.GZIP, new HashSet<Encoding>(Arrays.asList(Encoding.PLAIN)),
+            start, 0l, 0l, compressedBlockSize, uncompressedSize);
     blockMetaData.addColumn(column);
+    blockMetaData.setTotalByteSize(uncompressedSize);
     return blockMetaData;
   }
 }
diff --git a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
index b5e9c8b..d96d5bc 100644
--- a/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
+++ b/parquet-hive/parquet-hive-storage-handler/src/main/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveSchemaConverter.java
@@ -23,6 +23,7 @@
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
+import parquet.schema.ConversionPatterns;
 import parquet.schema.GroupType;
 import parquet.schema.MessageType;
 import parquet.schema.OriginalType;
@@ -118,8 +119,7 @@
         typeInfo.getMapKeyTypeInfo(), Repetition.REQUIRED);
     final Type valueType = convertType(ParquetHiveSerDe.MAP_VALUE.toString(),
         typeInfo.getMapValueTypeInfo());
-    return listWrapper(name, OriginalType.MAP_KEY_VALUE,
-        new GroupType(Repetition.REPEATED, ParquetHiveSerDe.MAP.toString(), keyType, valueType));
+    return ConversionPatterns.mapType(Repetition.OPTIONAL, name, keyType, valueType);
   }
 
   private static GroupType listWrapper(final String name, final OriginalType originalType,
diff --git a/parquet-hive/parquet-hive-storage-handler/src/test/java/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java b/parquet-hive/parquet-hive-storage-handler/src/test/java/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java
index 0b25f6e..89d95df 100644
--- a/parquet-hive/parquet-hive-storage-handler/src/test/java/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java
+++ b/parquet-hive/parquet-hive-storage-handler/src/test/java/org/apache/hadoop/hive/ql/io/parquet/TestHiveSchemaConverter.java
@@ -26,6 +26,8 @@
 
 import parquet.schema.MessageType;
 import parquet.schema.MessageTypeParser;
+import parquet.schema.OriginalType;
+import parquet.schema.Type.Repetition;
 
 public class TestHiveSchemaConverter {
 
@@ -111,4 +113,26 @@
             + "  }\n"
             + "}\n");
   }
+
+  @Test
+  public void testMapOriginalType() throws Exception {
+    final String hiveColumnTypes = "map<string,string>";
+    final String hiveColumnNames = "mapCol";
+    final List<String> columnNames = createHiveColumnsFrom(hiveColumnNames);
+    final List<TypeInfo> columnTypes = createHiveTypeInfoFrom(hiveColumnTypes);
+    final MessageType messageTypeFound = HiveSchemaConverter.convert(columnNames, columnTypes);
+    // this messageType only has one optional field, whose name is mapCol, original Type is MAP
+    assertEquals(1, messageTypeFound.getFieldCount());
+    parquet.schema.Type topLevel = messageTypeFound.getFields().get(0);
+    assertEquals("mapCol",topLevel.getName());
+    assertEquals(OriginalType.MAP, topLevel.getOriginalType());
+    assertEquals(Repetition.OPTIONAL, topLevel.getRepetition());
+
+    assertEquals(1, topLevel.asGroupType().getFieldCount());
+    parquet.schema.Type secondLevel = topLevel.asGroupType().getFields().get(0);
+    //there is one repeated field for mapCol, the field name is "map" and its original Type is MAP_KEY_VALUE;
+    assertEquals("map", secondLevel.getName());
+    assertEquals(OriginalType.MAP_KEY_VALUE, secondLevel.getOriginalType());
+    assertEquals(Repetition.REPEATED, secondLevel.getRepetition());
+  }
 }
diff --git a/parquet-scrooge/pom.xml b/parquet-scrooge/pom.xml
index 7f0ff6d..76464b5 100644
--- a/parquet-scrooge/pom.xml
+++ b/parquet-scrooge/pom.xml
@@ -74,7 +74,7 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>scrooge-core_2.9.2</artifactId>
-      <version>3.9.0</version>
+      <version>3.12.1</version>
     </dependency>
     <dependency>
       <groupId>com.twitter</groupId>
diff --git a/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeStructConverter.java b/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeStructConverter.java
index 3dd84f0..a13c741 100644
--- a/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeStructConverter.java
+++ b/parquet-scrooge/src/main/java/parquet/scrooge/ScroogeStructConverter.java
@@ -263,13 +263,19 @@
 
   public ThriftType convertEnumTypeField(ThriftStructField f) {
     List<ThriftType.EnumValue> enumValues = new ArrayList<ThriftType.EnumValue>();
-    String enumName = f.method().getReturnType().getName();
+
+    String enumName;
+    if (isOptional(f)) {
+      enumName = ((Class)extractClassFromOption(f.method().getGenericReturnType())).getName();
+    } else {
+      enumName = f.method().getReturnType().getName();
+    }
     try {
       List enumCollection = getEnumList(enumName);
       for (Object enumObj : enumCollection) {
         ScroogeEnumDesc enumDesc = ScroogeEnumDesc.getEnumDesc(enumObj);
         //be compatible with thrift generated enum which have capitalized name
-        enumValues.add(new ThriftType.EnumValue(enumDesc.id, enumDesc.name.toUpperCase()));
+        enumValues.add(new ThriftType.EnumValue(enumDesc.id, enumDesc.name.replaceAll("([a-z])([A-Z])","$1_$2").toUpperCase()));
       }
       return new ThriftType.EnumType(enumValues);
     } catch (Exception e) {
diff --git a/parquet-scrooge/src/test/java/parquet/scrooge/ScroogeStructConverterTest.java b/parquet-scrooge/src/test/java/parquet/scrooge/ScroogeStructConverterTest.java
index 5d77931..2b351b5 100644
--- a/parquet-scrooge/src/test/java/parquet/scrooge/ScroogeStructConverterTest.java
+++ b/parquet-scrooge/src/test/java/parquet/scrooge/ScroogeStructConverterTest.java
@@ -27,7 +27,6 @@
 import parquet.scrooge.test.TestSetPrimitive;
 import parquet.thrift.ThriftSchemaConverter;
 import parquet.thrift.struct.ThriftType;
-
 import static org.junit.Assert.assertEquals;
 
 /**
diff --git a/parquet-scrooge/src/test/thrift/test.thrift b/parquet-scrooge/src/test/thrift/test.thrift
index 329e73b..bd2c360 100644
--- a/parquet-scrooge/src/test/thrift/test.thrift
+++ b/parquet-scrooge/src/test/thrift/test.thrift
@@ -142,5 +142,6 @@
 
 struct TestFieldOfEnum{
  1: required Operation op
+ 2: optional Operation op2
 }
 
diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml
index 26b1d9c..575dce0 100644
--- a/parquet-thrift/pom.xml
+++ b/parquet-thrift/pom.xml
@@ -90,6 +90,13 @@
       <version>1.6</version>
       <scope>provided</scope>
     </dependency>
+    <dependency> <!-- for pig runtime in tests -->
+      <groupId>org.antlr</groupId>
+      <artifactId>antlr-runtime</artifactId>
+      <version>3.4</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java
index d1d9d07..abf38ff 100644
--- a/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java
+++ b/parquet-thrift/src/main/java/parquet/hadoop/thrift/ThriftReadSupport.java
@@ -92,16 +92,16 @@
     final MessageType fileMessageType = context.getFileSchema();
     MessageType requestedProjection = fileMessageType;
     String partialSchemaString = configuration.get(ReadSupport.PARQUET_READ_SCHEMA);
-    String projectionSchemaStr = configuration.get(THRIFT_COLUMN_FILTER_KEY);
+    String projectionFilterString = configuration.get(THRIFT_COLUMN_FILTER_KEY);
 
-    if (partialSchemaString != null && projectionSchemaStr != null)
+    if (partialSchemaString != null && projectionFilterString != null)
       throw new ThriftProjectionException("PARQUET_READ_SCHEMA and THRIFT_COLUMN_FILTER_KEY are both specified, should use only one.");
 
+    //set requestedProjections only when it's specified
     if (partialSchemaString != null) {
       requestedProjection = getSchemaForRead(fileMessageType, partialSchemaString);
-    } else {
-      FieldProjectionFilter fieldProjectionFilter = new FieldProjectionFilter(projectionSchemaStr);
-      context.getKeyValueMetadata();
+    } else if (projectionFilterString != null && !projectionFilterString.isEmpty()) {
+      FieldProjectionFilter fieldProjectionFilter = new FieldProjectionFilter(projectionFilterString);
       try {
         initThriftClassFromMultipleFiles(context.getKeyValueMetadata(), configuration);
         requestedProjection =  getProjectedSchema(fieldProjectionFilter);
diff --git a/parquet-thrift/src/main/java/parquet/thrift/pig/ParquetThriftStorer.java b/parquet-thrift/src/main/java/parquet/thrift/pig/ParquetThriftStorer.java
new file mode 100644
index 0000000..d28b523
--- /dev/null
+++ b/parquet-thrift/src/main/java/parquet/thrift/pig/ParquetThriftStorer.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright 2014 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.thrift.pig;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.Tuple;
+
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.io.ParquetEncodingException;
+
+/**
+ * To store in Pig using a thrift class
+ * usage:
+ * STORE 'foo' USING parquet.thrift.pig.ParquetThriftStorer('my.thrift.Class');
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class ParquetThriftStorer extends StoreFunc {
+
+  private RecordWriter<Void, Tuple> recordWriter;
+
+  private String className;
+
+  public ParquetThriftStorer(String[] params) {
+    if (params == null || params.length != 1) {
+      throw new IllegalArgumentException("required the thrift class name in parameter. Got " + Arrays.toString(params) + " instead");
+    }
+    className = params[0];
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public OutputFormat<Void, Tuple> getOutputFormat() throws IOException {
+    return new ParquetOutputFormat<Tuple>(new TupleToThriftWriteSupport(className));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" }) // that's how the base class is defined
+  @Override
+  public void prepareToWrite(RecordWriter recordWriter) throws IOException {
+    this.recordWriter = recordWriter;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void putNext(Tuple tuple) throws IOException {
+    try {
+      this.recordWriter.write(null, tuple);
+    } catch (InterruptedException e) {
+      throw new ParquetEncodingException("Interrupted while writing", e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setStoreLocation(String location, Job job) throws IOException {
+    FileOutputFormat.setOutputPath(job, new Path(location));
+  }
+
+}
diff --git a/parquet-thrift/src/main/java/parquet/thrift/pig/TupleToThriftWriteSupport.java b/parquet-thrift/src/main/java/parquet/thrift/pig/TupleToThriftWriteSupport.java
new file mode 100644
index 0000000..781d897
--- /dev/null
+++ b/parquet-thrift/src/main/java/parquet/thrift/pig/TupleToThriftWriteSupport.java
@@ -0,0 +1,74 @@
+/**
+ * Copyright 2014 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.thrift.pig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.data.Tuple;
+import org.apache.thrift.TBase;
+
+import parquet.hadoop.BadConfigurationException;
+import parquet.hadoop.api.WriteSupport;
+import parquet.hadoop.thrift.ThriftWriteSupport;
+import parquet.io.api.RecordConsumer;
+
+import com.twitter.elephantbird.pig.util.PigToThrift;
+
+/**
+ * Stores Pig tuples as Thrift objects
+ *
+ * @author Julien Le Dem
+ *
+ */
+public class TupleToThriftWriteSupport extends WriteSupport<Tuple> {
+
+  private final String className;
+  private ThriftWriteSupport<TBase<?,?>> thriftWriteSupport;
+  private PigToThrift<TBase<?,?>> pigToThrift;
+
+  /**
+   * @param className the thrift class name
+   */
+  public TupleToThriftWriteSupport(String className) {
+    super();
+    this.className = className;
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public WriteContext init(Configuration configuration) {
+    try {
+      Class<?> clazz = configuration.getClassByName(className).asSubclass(TBase.class);
+      thriftWriteSupport = new ThriftWriteSupport(clazz);
+      pigToThrift = new PigToThrift(clazz);
+      return thriftWriteSupport.init(configuration);
+    } catch (ClassNotFoundException e) {
+      throw new BadConfigurationException("The thrift class name was not found: " + className, e);
+    } catch (ClassCastException e) {
+      throw new BadConfigurationException("The thrift class name should extend TBase: " + className, e);
+    }
+  }
+
+  @Override
+  public void prepareForWrite(RecordConsumer recordConsumer) {
+    thriftWriteSupport.prepareForWrite(recordConsumer);
+  }
+
+  @Override
+  public void write(Tuple t) {
+    thriftWriteSupport.write(pigToThrift.getThriftObject(t));
+  }
+
+}
diff --git a/parquet-thrift/src/test/java/parquet/thrift/pig/TestParquetThriftStorer.java b/parquet-thrift/src/test/java/parquet/thrift/pig/TestParquetThriftStorer.java
new file mode 100644
index 0000000..15c1a04
--- /dev/null
+++ b/parquet-thrift/src/test/java/parquet/thrift/pig/TestParquetThriftStorer.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2014 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.thrift.pig;
+
+import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+
+import parquet.pig.ParquetLoader;
+import parquet.thrift.test.Name;
+
+public class TestParquetThriftStorer {
+  @Test
+  public void testStorer() throws ExecException, Exception {
+    String out = "target/out";
+    int rows = 1000;
+    Properties props = new Properties();
+    props.setProperty("parquet.compression", "uncompressed");
+    props.setProperty("parquet.page.size", "1000");
+    PigServer pigServer = new PigServer(ExecType.LOCAL, props);
+    Data data = Storage.resetData(pigServer);
+    Collection<Tuple> list = new ArrayList<Tuple>();
+    for (int i = 0; i < rows; i++) {
+      list.add(tuple("bob", "roberts" + i));
+    }
+    data.set("in", "fn:chararray, ln:chararray", list );
+    pigServer.deleteFile(out);
+    pigServer.setBatchOn();
+    pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+    pigServer.registerQuery("Store A into '"+out+"' using "+ParquetThriftStorer.class.getName()+"('" + Name.class.getName() + "');");
+    execBatch(pigServer);
+
+    pigServer.registerQuery("B = LOAD '"+out+"' USING "+ParquetLoader.class.getName()+"();");
+    pigServer.registerQuery("Store B into 'out' using mock.Storage();");
+    execBatch(pigServer);
+
+    List<Tuple> result = data.get("out");
+
+    assertEquals(rows, result.size());
+    int i = 0;
+    for (Tuple tuple : result) {
+      assertEquals(tuple("bob", "roberts" + i), tuple);
+      ++i;
+    }
+  }
+
+  private void execBatch(PigServer pigServer) throws IOException {
+    if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
+      throw new RuntimeException("Job failed", pigServer.executeBatch().get(0).getException());
+    }
+  }
+}
diff --git a/parquet-tools/src/main/java/parquet/tools/command/DumpCommand.java b/parquet-tools/src/main/java/parquet/tools/command/DumpCommand.java
index 21cf399..5c9c6c3 100644
--- a/parquet-tools/src/main/java/parquet/tools/command/DumpCommand.java
+++ b/parquet-tools/src/main/java/parquet/tools/command/DumpCommand.java
@@ -263,7 +263,7 @@
         out.format("*** row group %d of %d, values %d to %d ***%n", page, total, offset, offset + creader.getTotalValueCount() - 1);
 
         for (long i = 0, e = creader.getTotalValueCount(); i < e; ++i) {
-            int rlvl = creader.getCurrentDefinitionLevel();
+            int rlvl = creader.getCurrentRepetitionLevel();
             int dlvl = creader.getCurrentDefinitionLevel();
 
             out.format("value %d: R:%d D:%d V:", offset+i, rlvl, dlvl);