PARQUET-801: Allow UserDefinedPredicates in DictionaryFilter
Author: Patrick Woody <pwoody@palantir.com>
Author: Patrick Woody <patrick.woody1@gmail.com>
Closes #394 from pwoody/pw/dictionaryUdp and squashes the following commits:
d8499a0 [Patrick Woody] short circuiting and style changes
4cb9f0c [Patrick Woody] more missing imports
1ec0d39 [Patrick Woody] fix missing import
3ee4489 [Patrick Woody] PARQUET-801: Allow UserDefinedPredicates in DictionaryFilter
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
index 91f3007..19604ec 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
@@ -212,8 +212,8 @@
return BLOCK_MIGHT_MATCH;
}
- for(T entry : dictSet) {
- if(value.compareTo(entry) > 0) {
+ for (T entry : dictSet) {
+ if (value.compareTo(entry) > 0) {
return BLOCK_MIGHT_MATCH;
}
}
@@ -253,8 +253,8 @@
return BLOCK_MIGHT_MATCH;
}
- for(T entry : dictSet) {
- if(value.compareTo(entry) >= 0) {
+ for (T entry : dictSet) {
+ if (value.compareTo(entry) >= 0) {
return BLOCK_MIGHT_MATCH;
}
}
@@ -292,8 +292,8 @@
return BLOCK_MIGHT_MATCH;
}
- for(T entry : dictSet) {
- if(value.compareTo(entry) < 0) {
+ for (T entry : dictSet) {
+ if (value.compareTo(entry) < 0) {
return BLOCK_MIGHT_MATCH;
}
}
@@ -333,8 +333,8 @@
return BLOCK_MIGHT_MATCH;
}
- for(T entry : dictSet) {
- if(value.compareTo(entry) <= 0) {
+ for (T entry : dictSet) {
+ if (value.compareTo(entry) <= 0) {
return BLOCK_MIGHT_MATCH;
}
}
@@ -363,14 +363,50 @@
"This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
}
+ private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud, boolean inverted) {
+ Column<T> filterColumn = ud.getColumn();
+ ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+ U udp = ud.getUserDefinedPredicate();
+
+ // The column is missing, thus all null. Check if the predicate keeps null.
+ if (meta == null) {
+ if (inverted) {
+ return udp.keep(null);
+ } else {
+ return !udp.keep(null);
+ }
+ }
+
+ if (hasNonDictionaryPages(meta)) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ try {
+ Set<T> dictSet = expandDictionary(meta);
+ if (dictSet == null) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ for (T entry : dictSet) {
+ boolean keep = udp.keep(entry);
+ if ((keep && !inverted) || (!keep && inverted)) return BLOCK_MIGHT_MATCH;
+ }
+ return BLOCK_CANNOT_MATCH;
+ } catch (IOException e) {
+ LOG.warn("Failed to process dictionary for filter evaluation.", e);
+ }
+
+ return BLOCK_MIGHT_MATCH;
+ }
+
@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> udp) {
- throw new UnsupportedOperationException("UDP not supported with dictionary evaluation.");
+ return visit(udp, false);
}
@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(LogicalNotUserDefined<T, U> udp) {
- throw new UnsupportedOperationException("UDP not supported with dictionary evaluation.");
+ return visit(udp.getUserDefined(), true);
}
@SuppressWarnings("deprecation")
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
index eca6332..3883d87 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
@@ -19,6 +19,9 @@
package org.apache.parquet.filter2.dictionarylevel;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -27,11 +30,14 @@
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
import org.apache.parquet.filter2.predicate.Operators.IntColumn;
import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
@@ -47,6 +53,7 @@
import org.junit.Test;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -331,6 +338,43 @@
canDrop(or(x, y), ccmd, dictionaries));
}
+
+ @Test
+ public void testUdp() throws Exception {
+ InInt32UDP dropabble = new InInt32UDP(ImmutableSet.of(42));
+ InInt32UDP undroppable = new InInt32UDP(ImmutableSet.of(205));
+
+ assertTrue("Should drop block for non-matching UDP",
+ canDrop(userDefined(intColumn("int32_field"), dropabble), ccmd, dictionaries));
+
+ assertFalse("Should not drop block for matching UDP",
+ canDrop(userDefined(intColumn("int32_field"), undroppable), ccmd, dictionaries));
+ }
+
+ @Test
+ public void testInverseUdp() throws Exception {
+ InInt32UDP droppable = new InInt32UDP(ImmutableSet.of(42));
+ InInt32UDP undroppable = new InInt32UDP(ImmutableSet.of(205));
+ Set<Integer> allValues = ImmutableSet.copyOf(Arrays.asList(ArrayUtils.toObject(intValues)));
+ InInt32UDP completeMatch = new InInt32UDP(allValues);
+
+ FilterPredicate inverse =
+ LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), droppable)));
+ FilterPredicate inverse1 =
+ LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), undroppable)));
+ FilterPredicate inverse2 =
+ LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), completeMatch)));
+
+ assertFalse("Should not drop block for inverse of non-matching UDP",
+ canDrop(inverse, ccmd, dictionaries));
+
+ assertFalse("Should not drop block for inverse of UDP with some matches",
+ canDrop(inverse1, ccmd, dictionaries));
+
+ assertTrue("Should drop block for inverse of UDP with all matches",
+ canDrop(inverse2, ccmd, dictionaries));
+ }
+
@Test
public void testColumnWithoutDictionary() throws Exception {
IntColumn plain = intColumn("plain_int32_field");
@@ -437,6 +481,56 @@
canDrop(gtEq(b, Binary.fromString("any")), ccmd, dictionaries));
}
+ @Test
+ public void testUdpMissingColumn() throws Exception {
+ InInt32UDP nullRejecting = new InInt32UDP(ImmutableSet.of(42));
+ InInt32UDP nullAccepting = new InInt32UDP(Sets.newHashSet((Integer) null));
+ IntColumn fake = intColumn("missing_column");
+
+ assertTrue("Should drop block for null rejecting udp",
+ canDrop(userDefined(fake, nullRejecting), ccmd, dictionaries));
+ assertFalse("Should not drop block for null accepting udp",
+ canDrop(userDefined(fake, nullAccepting), ccmd, dictionaries));
+ }
+
+
+ @Test
+ public void testInverseUdpMissingColumn() throws Exception {
+ InInt32UDP nullRejecting = new InInt32UDP(ImmutableSet.of(42));
+ InInt32UDP nullAccepting = new InInt32UDP(Sets.newHashSet((Integer) null));
+ IntColumn fake = intColumn("missing_column");
+
+ assertTrue("Should drop block for null accepting udp",
+ canDrop(LogicalInverseRewriter.rewrite(not(userDefined(fake, nullAccepting))), ccmd, dictionaries));
+ assertFalse("Should not drop block for null rejecting udp",
+ canDrop(LogicalInverseRewriter.rewrite(not(userDefined(fake, nullRejecting))), ccmd, dictionaries));
+ }
+
+
+ private static final class InInt32UDP extends UserDefinedPredicate<Integer> implements Serializable {
+
+ private final Set<Integer> ints;
+
+ InInt32UDP(Set<Integer> ints) {
+ this.ints = ints;
+ }
+
+ @Override
+ public boolean keep(Integer value) {
+ return ints.contains(value);
+ }
+
+ @Override
+ public boolean canDrop(Statistics<Integer> statistics) {
+ return false;
+ }
+
+ @Override
+ public boolean inverseCanDrop(Statistics<Integer> statistics) {
+ return false;
+ }
+ }
+
private static double toDouble(int value) {
return (value * 1.0);
}