PIG-4092: Predicate pushdown for Parquet (nkollar via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1829112 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 651fb7c..ac5eb44 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
IMPROVEMENTS
+PIG-4092: Predicate pushdown for Parquet (nkollar via rohini)
+
PIG-5317: Upgrade old dependencies: commons-lang, hsqldb, commons-logging (nkollar via rohini)
PIG-5322: ConstantCalculator optimizer is not applied for split (rohini)
diff --git a/ivy.xml b/ivy.xml
index 3ac6751..a84322e 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -390,7 +390,7 @@
<dependency org="org.mockito" name="mockito-all" rev="${mockito.version}" conf="test->default"/>
- <dependency org="com.twitter" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/>
+ <dependency org="org.apache.parquet" name="parquet-pig-bundle" rev="${parquet-pig-bundle.version}" conf="compile->master"/>
<!-- for Spark 1.x integration -->
<dependency org="org.apache.spark" name="spark-core_2.11" rev="${spark1.version}" conf="spark1->default">
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index 800b75e..9eb4a7d 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -87,7 +87,7 @@
asm.version=3.3.1
snappy-java.version=1.1.1.3
tez.version=0.7.0
-parquet-pig-bundle.version=1.2.3
+parquet-pig-bundle.version=1.9.0
snappy.version=0.2
leveldbjni.version=1.8
curator.version=2.6.0
diff --git a/src/org/apache/pig/builtin/ParquetLoader.java b/src/org/apache/pig/builtin/ParquetLoader.java
index 76516e3..c3c45ba 100644
--- a/src/org/apache/pig/builtin/ParquetLoader.java
+++ b/src/org/apache/pig/builtin/ParquetLoader.java
@@ -20,8 +20,10 @@
import java.util.List;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.Expression;
import org.apache.pig.LoadFuncMetadataWrapper;
import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPredicatePushdown;
import org.apache.pig.LoadPushDown;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.JarManager;
@@ -29,7 +31,7 @@
/**
* Wrapper class which will delegate calls to parquet.pig.ParquetLoader
*/
-public class ParquetLoader extends LoadFuncMetadataWrapper implements LoadPushDown {
+public class ParquetLoader extends LoadFuncMetadataWrapper implements LoadPushDown, LoadPredicatePushdown {
public ParquetLoader() throws FrontendException {
this(null);
@@ -37,12 +39,12 @@
public ParquetLoader(String requestedSchemaStr) throws FrontendException {
try {
- init(new parquet.pig.ParquetLoader(requestedSchemaStr));
+ init(new org.apache.parquet.pig.ParquetLoader(requestedSchemaStr));
}
// if compile time dependency not found at runtime
catch (NoClassDefFoundError e) {
throw new FrontendException(String.format("Cannot instantiate class %s (%s)",
- getClass().getName(), "parquet.pig.ParquetLoader"), 2259, e);
+ getClass().getName(), "org.apache.parquet.ParquetLoader"), 2259, e);
}
}
@@ -52,7 +54,7 @@
@Override
public void setLocation(String location, Job job) throws IOException {
- JarManager.addDependencyJars(job, parquet.Version.class);
+ JarManager.addDependencyJars(job, org.apache.parquet.Version.class);
super.setLocation(location, job);
}
@@ -66,5 +68,19 @@
throws FrontendException {
return ((LoadPushDown)super.loadFunc()).pushProjection(requiredFieldList);
}
-
+
+ @Override
+ public List<String> getPredicateFields(String location, Job job) throws IOException {
+ return ((LoadPredicatePushdown)super.loadFunc()).getPredicateFields(location, job);
+ }
+
+ @Override
+ public List<Expression.OpType> getSupportedExpressionTypes() {
+ return ((LoadPredicatePushdown)super.loadFunc()).getSupportedExpressionTypes();
+ }
+
+ @Override
+ public void setPushdownPredicate(Expression predicate) throws IOException {
+ ((LoadPredicatePushdown)super.loadFunc()).setPushdownPredicate(predicate);
+ }
}
diff --git a/src/org/apache/pig/builtin/ParquetStorer.java b/src/org/apache/pig/builtin/ParquetStorer.java
index 2052236..6ac3b98 100644
--- a/src/org/apache/pig/builtin/ParquetStorer.java
+++ b/src/org/apache/pig/builtin/ParquetStorer.java
@@ -31,12 +31,12 @@
public ParquetStorer() throws FrontendException {
try {
- init(new parquet.pig.ParquetStorer());
+ init(new org.apache.parquet.pig.ParquetStorer());
}
// if compile time dependency not found at runtime
catch (NoClassDefFoundError e) {
throw new FrontendException(String.format("Cannot instantiate class %s (%s)",
- getClass().getName(), "parquet.pig.ParquetStorer"), 2259, e);
+ getClass().getName(), "org.apache.parquet.pig.ParquetStorer"), 2259, e);
}
}
@@ -49,7 +49,7 @@
*/
@Override
public void setStoreLocation(String location, Job job) throws IOException {
- JarManager.addDependencyJars(job, parquet.Version.class);
+ JarManager.addDependencyJars(job, org.apache.parquet.Version.class);
super.setStoreLocation(location, job);
}
diff --git a/test/org/apache/pig/test/TestSplitCombine.java b/test/org/apache/pig/test/TestSplitCombine.java
index cdde871..ad35fdf 100644
--- a/test/org/apache/pig/test/TestSplitCombine.java
+++ b/test/org/apache/pig/test/TestSplitCombine.java
@@ -41,8 +41,8 @@
import org.junit.Before;
import org.junit.Test;
-import parquet.hadoop.ParquetInputSplit;
-import parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.ParquetInputSplit;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
public class TestSplitCombine {
private Configuration conf;
@@ -527,7 +527,7 @@
// first split is parquetinputsplit
rawSplits.add(new ParquetInputSplit(new Path("path1"), 0, 100,
new String[] { "l1", "l2", "l3" },
- new ArrayList<BlockMetaData>(), "", "",
+ new ArrayList<BlockMetaData>(), "message dummy {}", "",
new HashMap<String, String>(), new HashMap<String, String>()));
// second split is file split
rawSplits.add(new FileSplit(new Path("path2"), 0, 400, new String[] {
@@ -559,7 +559,7 @@
Assert.assertEquals(500, anotherSplit.getLength());
Assert.assertEquals(2, anotherSplit.getNumPaths());
- Assert.assertEquals("parquet.hadoop.ParquetInputSplit",
+ Assert.assertEquals("org.apache.parquet.hadoop.ParquetInputSplit",
(anotherSplit.getWrappedSplit(0).getClass().getName()));
Assert.assertEquals(
"org.apache.hadoop.mapreduce.lib.input.FileSplit",