PARQUET-801: Allow UserDefinedPredicates in DictionaryFilter

Author: Patrick Woody <pwoody@palantir.com>
Author: Patrick Woody <patrick.woody1@gmail.com>

Closes #394 from pwoody/pw/dictionaryUdp and squashes the following commits:

d8499a0 [Patrick Woody] short circuiting and style changes
4cb9f0c [Patrick Woody] more missing imports
1ec0d39 [Patrick Woody] fix missing import
3ee4489 [Patrick Woody] PARQUET-801: Allow UserDefinedPredicates in DictionaryFilter
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
index 91f3007..19604ec 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilter.java
@@ -212,8 +212,8 @@
         return BLOCK_MIGHT_MATCH;
       }
 
-      for(T entry : dictSet) {
-        if(value.compareTo(entry) > 0) {
+      for (T entry : dictSet) {
+        if (value.compareTo(entry) > 0) {
           return BLOCK_MIGHT_MATCH;
         }
       }
@@ -253,8 +253,8 @@
         return BLOCK_MIGHT_MATCH;
       }
 
-      for(T entry : dictSet) {
-        if(value.compareTo(entry) >= 0) {
+      for (T entry : dictSet) {
+        if (value.compareTo(entry) >= 0) {
           return BLOCK_MIGHT_MATCH;
         }
       }
@@ -292,8 +292,8 @@
         return BLOCK_MIGHT_MATCH;
       }
 
-      for(T entry : dictSet) {
-        if(value.compareTo(entry) < 0) {
+      for (T entry : dictSet) {
+        if (value.compareTo(entry) < 0) {
           return BLOCK_MIGHT_MATCH;
         }
       }
@@ -333,8 +333,8 @@
         return BLOCK_MIGHT_MATCH;
       }
 
-      for(T entry : dictSet) {
-        if(value.compareTo(entry) <= 0) {
+      for (T entry : dictSet) {
+        if (value.compareTo(entry) <= 0) {
           return BLOCK_MIGHT_MATCH;
         }
       }
@@ -363,14 +363,50 @@
         "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
   }
 
+  private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> ud, boolean inverted) {
+    Column<T> filterColumn = ud.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+    U udp = ud.getUserDefinedPredicate();
+
+    // The column is missing, thus all null. Check if the predicate keeps null.
+    if (meta == null) {
+      if (inverted) {
+        return udp.keep(null);
+      } else {
+        return !udp.keep(null);
+      }
+    }
+
+    if (hasNonDictionaryPages(meta)) {
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    try {
+      Set<T> dictSet = expandDictionary(meta);
+      if (dictSet == null) {
+        return BLOCK_MIGHT_MATCH;
+      }
+
+      for (T entry : dictSet) {
+        boolean keep = udp.keep(entry);
+        if ((keep && !inverted) || (!keep && inverted)) return BLOCK_MIGHT_MATCH;
+      }
+      return BLOCK_CANNOT_MATCH;
+    } catch (IOException e) {
+      LOG.warn("Failed to process dictionary for filter evaluation.", e);
+    }
+
+    return BLOCK_MIGHT_MATCH;
+  }
+
   @Override
   public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(UserDefined<T, U> udp) {
-    throw new UnsupportedOperationException("UDP not supported with dictionary evaluation.");
+    return visit(udp, false);
   }
 
   @Override
   public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(LogicalNotUserDefined<T, U> udp) {
-    throw new UnsupportedOperationException("UDP not supported with dictionary evaluation.");
+    return visit(udp.getUserDefined(), true);
   }
 
   @SuppressWarnings("deprecation")
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
index eca6332..3883d87 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/dictionarylevel/DictionaryFilterTest.java
@@ -19,6 +19,9 @@
 
 package org.apache.parquet.filter2.dictionarylevel;
 
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -27,11 +30,14 @@
 import org.apache.parquet.example.data.Group;
 import org.apache.parquet.example.data.simple.SimpleGroupFactory;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
 import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
 import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
 import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
 import org.apache.parquet.filter2.predicate.Operators.IntColumn;
 import org.apache.parquet.filter2.predicate.Operators.LongColumn;
+import org.apache.parquet.filter2.predicate.Statistics;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.example.ExampleParquetWriter;
@@ -47,6 +53,7 @@
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -331,6 +338,43 @@
         canDrop(or(x, y), ccmd, dictionaries));
   }
 
+
+  @Test
+  public void testUdp() throws Exception {
+    InInt32UDP dropabble = new InInt32UDP(ImmutableSet.of(42));
+    InInt32UDP undroppable = new InInt32UDP(ImmutableSet.of(205));
+
+    assertTrue("Should drop block for non-matching UDP",
+      canDrop(userDefined(intColumn("int32_field"), dropabble), ccmd, dictionaries));
+
+    assertFalse("Should not drop block for matching UDP",
+      canDrop(userDefined(intColumn("int32_field"), undroppable), ccmd, dictionaries));
+  }
+
+  @Test
+  public void testInverseUdp() throws Exception {
+    InInt32UDP droppable = new InInt32UDP(ImmutableSet.of(42));
+    InInt32UDP undroppable = new InInt32UDP(ImmutableSet.of(205));
+    Set<Integer> allValues = ImmutableSet.copyOf(Arrays.asList(ArrayUtils.toObject(intValues)));
+    InInt32UDP completeMatch = new InInt32UDP(allValues);
+
+    FilterPredicate inverse =
+      LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), droppable)));
+    FilterPredicate inverse1 =
+      LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), undroppable)));
+    FilterPredicate inverse2 =
+      LogicalInverseRewriter.rewrite(not(userDefined(intColumn("int32_field"), completeMatch)));
+
+    assertFalse("Should not drop block for inverse of non-matching UDP",
+      canDrop(inverse, ccmd, dictionaries));
+
+    assertFalse("Should not drop block for inverse of UDP with some matches",
+      canDrop(inverse1, ccmd, dictionaries));
+
+    assertTrue("Should drop block for inverse of UDP with all matches",
+      canDrop(inverse2, ccmd, dictionaries));
+  }
+
   @Test
   public void testColumnWithoutDictionary() throws Exception {
     IntColumn plain = intColumn("plain_int32_field");
@@ -437,6 +481,56 @@
         canDrop(gtEq(b, Binary.fromString("any")), ccmd, dictionaries));
   }
 
+  @Test
+  public void testUdpMissingColumn() throws Exception {
+    InInt32UDP nullRejecting = new InInt32UDP(ImmutableSet.of(42));
+    InInt32UDP nullAccepting = new InInt32UDP(Sets.newHashSet((Integer) null));
+    IntColumn fake = intColumn("missing_column");
+
+    assertTrue("Should drop block for null rejecting udp",
+      canDrop(userDefined(fake, nullRejecting), ccmd, dictionaries));
+    assertFalse("Should not drop block for null accepting udp",
+      canDrop(userDefined(fake, nullAccepting), ccmd, dictionaries));
+  }
+
+
+  @Test
+  public void testInverseUdpMissingColumn() throws Exception {
+    InInt32UDP nullRejecting = new InInt32UDP(ImmutableSet.of(42));
+    InInt32UDP nullAccepting = new InInt32UDP(Sets.newHashSet((Integer) null));
+    IntColumn fake = intColumn("missing_column");
+
+    assertTrue("Should drop block for null accepting udp",
+      canDrop(LogicalInverseRewriter.rewrite(not(userDefined(fake, nullAccepting))), ccmd, dictionaries));
+    assertFalse("Should not drop block for null rejecting udp",
+      canDrop(LogicalInverseRewriter.rewrite(not(userDefined(fake, nullRejecting))), ccmd, dictionaries));
+  }
+
+
+  private static final class InInt32UDP extends UserDefinedPredicate<Integer> implements Serializable {
+
+    private final Set<Integer> ints;
+
+    InInt32UDP(Set<Integer> ints) {
+      this.ints = ints;
+    }
+
+    @Override
+    public boolean keep(Integer value) {
+      return ints.contains(value);
+    }
+
+    @Override
+    public boolean canDrop(Statistics<Integer> statistics) {
+      return false;
+    }
+
+    @Override
+    public boolean inverseCanDrop(Statistics<Integer> statistics) {
+      return false;
+    }
+  }
+
   private static double toDouble(int value) {
     return (value * 1.0);
   }