PIG-4092: Predicate pushdown for Parquet (nkollar via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1829112 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 651fb7c..ac5eb44 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-4092: Predicate pushdown for Parquet (nkollar via rohini)
+
 PIG-5317: Upgrade old dependencies: commons-lang, hsqldb, commons-logging (nkollar via rohini)
 
 PIG-5322: ConstantCalculator optimizer is not applied for split (rohini)
diff --git a/ivy.xml b/ivy.xml
index 3ac6751..a84322e 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -390,7 +390,7 @@
 
     <dependency org="org.mockito" name="mockito-all" rev="${mockito.version}" conf="test->default"/>
 
-    <dependency org="com.twitter" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/>
+    <dependency org="org.apache.parquet" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/>
 
     <!-- for Spark 1.x integration -->
     <dependency org="org.apache.spark" name="spark-core_2.11" rev="${spark1.version}" conf="spark1->default">
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 800b75e..9eb4a7d 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -87,7 +87,7 @@
 asm.version=3.3.1
 snappy-java.version=1.1.1.3
 tez.version=0.7.0
-parquet-pig-bundle.version=1.2.3
+parquet-pig-bundle.version=1.9.0
 snappy.version=0.2
 leveldbjni.version=1.8
 curator.version=2.6.0
diff --git a/src/org/apache/pig/builtin/ParquetLoader.java b/src/org/apache/pig/builtin/ParquetLoader.java
index 76516e3..c3c45ba 100644
--- a/src/org/apache/pig/builtin/ParquetLoader.java
+++ b/src/org/apache/pig/builtin/ParquetLoader.java
@@ -20,8 +20,10 @@
 import java.util.List;
 
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.Expression;
 import org.apache.pig.LoadFuncMetadataWrapper;
 import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPredicatePushdown;
 import org.apache.pig.LoadPushDown;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.JarManager;
@@ -29,7 +31,7 @@
 /**
  * Wrapper class which will delegate calls to parquet.pig.ParquetLoader
  */
-public class ParquetLoader extends LoadFuncMetadataWrapper implements LoadPushDown {
+public class ParquetLoader extends LoadFuncMetadataWrapper implements LoadPushDown, LoadPredicatePushdown {
 
     public ParquetLoader() throws FrontendException {
         this(null);
@@ -37,12 +39,12 @@
     
     public ParquetLoader(String requestedSchemaStr) throws FrontendException {
         try {
-            init(new parquet.pig.ParquetLoader(requestedSchemaStr));
+            init(new org.apache.parquet.pig.ParquetLoader(requestedSchemaStr));
         }
         // if compile time dependency not found at runtime
         catch (NoClassDefFoundError e) {
             throw new FrontendException(String.format("Cannot instantiate class %s (%s)",
-                    getClass().getName(), "parquet.pig.ParquetLoader"), 2259, e);
+                    getClass().getName(), "org.apache.parquet.ParquetLoader"), 2259, e);
         }
     }
     
@@ -52,7 +54,7 @@
     
     @Override
     public void setLocation(String location, Job job) throws IOException {
-        JarManager.addDependencyJars(job, parquet.Version.class);
+        JarManager.addDependencyJars(job, org.apache.parquet.Version.class);
         super.setLocation(location, job);
     }
 
@@ -66,5 +68,19 @@
             throws FrontendException {
         return ((LoadPushDown)super.loadFunc()).pushProjection(requiredFieldList);
     }
-    
+
+    @Override
+    public List<String> getPredicateFields(String location, Job job) throws IOException {
+        return ((LoadPredicatePushdown)super.loadFunc()).getPredicateFields(location, job);
+    }
+
+    @Override
+    public List<Expression.OpType> getSupportedExpressionTypes() {
+        return ((LoadPredicatePushdown)super.loadFunc()).getSupportedExpressionTypes();
+    }
+
+    @Override
+    public void setPushdownPredicate(Expression predicate) throws IOException {
+        ((LoadPredicatePushdown)super.loadFunc()).setPushdownPredicate(predicate);
+    }
 }
diff --git a/src/org/apache/pig/builtin/ParquetStorer.java b/src/org/apache/pig/builtin/ParquetStorer.java
index 2052236..6ac3b98 100644
--- a/src/org/apache/pig/builtin/ParquetStorer.java
+++ b/src/org/apache/pig/builtin/ParquetStorer.java
@@ -31,12 +31,12 @@
 
     public ParquetStorer() throws FrontendException {
         try {
-            init(new parquet.pig.ParquetStorer());
+            init(new org.apache.parquet.pig.ParquetStorer());
         }
         // if compile time dependency not found at runtime
         catch (NoClassDefFoundError e) {
             throw new FrontendException(String.format("Cannot instantiate class %s (%s)",
-                    getClass().getName(), "parquet.pig.ParquetStorer"), 2259, e);
+                    getClass().getName(), "org.apache.parquet.pig.ParquetStorer"), 2259, e);
         }
     }
     
@@ -49,7 +49,7 @@
      */
     @Override
     public void setStoreLocation(String location, Job job) throws IOException {
-        JarManager.addDependencyJars(job, parquet.Version.class);
+        JarManager.addDependencyJars(job, org.apache.parquet.Version.class);
         super.setStoreLocation(location, job);
     }
     
diff --git a/test/org/apache/pig/test/TestSplitCombine.java b/test/org/apache/pig/test/TestSplitCombine.java
index cdde871..ad35fdf 100644
--- a/test/org/apache/pig/test/TestSplitCombine.java
+++ b/test/org/apache/pig/test/TestSplitCombine.java
@@ -41,8 +41,8 @@
 import org.junit.Before;
 import org.junit.Test;
 
-import parquet.hadoop.ParquetInputSplit;
-import parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
 
 public class TestSplitCombine {
     private Configuration conf;
@@ -527,7 +527,7 @@
         // first split is parquetinputsplit
         rawSplits.add(new ParquetInputSplit(new Path("path1"), 0, 100,
                 new String[] { "l1", "l2", "l3" },
-                new ArrayList<BlockMetaData>(), "", "",
+                new ArrayList<BlockMetaData>(), "message dummy {}", "",
                 new HashMap<String, String>(), new HashMap<String, String>()));
         // second split is file split
         rawSplits.add(new FileSplit(new Path("path2"), 0, 400, new String[] {
@@ -559,7 +559,7 @@
             Assert.assertEquals(500, anotherSplit.getLength());
 
             Assert.assertEquals(2, anotherSplit.getNumPaths());
-            Assert.assertEquals("parquet.hadoop.ParquetInputSplit",
+            Assert.assertEquals("org.apache.parquet.hadoop.ParquetInputSplit",
                     (anotherSplit.getWrappedSplit(0).getClass().getName()));
             Assert.assertEquals(
                     "org.apache.hadoop.mapreduce.lib.input.FileSplit",