PARQUET-1964: Properly handle missing/null filter (#856)

diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java b/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
index 463a54c..7437e79 100644
--- a/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
@@ -121,6 +121,17 @@
     return NOOP;
   }
 
+  /**
+   * Returns whether filtering is required based on the specified filter. It is used to avoid any significant steps to
+   * prepare filtering if {@link #NOOP} is used.
+   *
+   * @param filter the filter to be checked
+   * @return {@code false} if the filter is {@code null} or is a no-op filter, {@code true} otherwise.
+   */
+  public static boolean isFilteringRequired(Filter filter) {
+    return filter != null && !(filter instanceof NoOpFilter);
+  }
+
   // wraps a FilterPredicate
   public static final class FilterPredicateCompat implements Filter {
     private final FilterPredicate filterPredicate;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index ff34838..8ffe19f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -86,7 +86,7 @@
    */
   public InternalParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
     this.readSupport = readSupport;
-    this.filter = Objects.requireNonNull(filter, "filter cannot be null");
+    this.filter = filter == null ? FilterCompat.NOOP : filter;
   }
 
   /**
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 8d09b87..b13d336 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -527,7 +527,7 @@
     return readFooter(file, options, f, converter);
   }
 
-  private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options, 
+  private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options,
       SeekableInputStream f, ParquetMetadataConverter converter) throws IOException {
 
     long fileLen = file.getLength();
@@ -579,7 +579,7 @@
 
     // Regular file, or encrypted file with plaintext footer
     if (!encryptedFooterMode) {
-      return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, false, 
+      return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, false,
           fileMetadataLength);
     }
 
@@ -588,10 +588,10 @@
       throw new ParquetCryptoRuntimeException("Trying to read file with encrypted footer. No keys available");
     }
     FileCryptoMetaData fileCryptoMetaData = readFileCryptoMetaData(footerBytesStream);
-    fileDecryptor.setFileCryptoMetaData(fileCryptoMetaData.getEncryption_algorithm(), 
+    fileDecryptor.setFileCryptoMetaData(fileCryptoMetaData.getEncryption_algorithm(),
         true, fileCryptoMetaData.getKey_metadata());
     // footer length is required only for signed plaintext footers
-    return  converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0); 
+    return  converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0);
   }
 
   /**
@@ -827,7 +827,7 @@
   }
 
   public long getFilteredRecordCount() {
-    if (!options.useColumnIndexFilter()) {
+    if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
       return getRecordCount();
     }
     long total = 0;
@@ -867,7 +867,7 @@
     }
 
     FilterCompat.Filter recordFilter = options.getRecordFilter();
-    if (recordFilter != null) {
+    if (FilterCompat.isFilteringRequired(recordFilter)) {
       return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this);
     }
 
@@ -953,7 +953,7 @@
     if (currentBlock == blocks.size()) {
       return null;
     }
-    if (!options.useColumnIndexFilter()) {
+    if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
       return readNextRowGroup();
     }
     BlockMetaData block = blocks.get(currentBlock);
@@ -1030,8 +1030,8 @@
     if (!columnDecryptionSetup.isEncrypted()) { // plaintext column
       currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
     }  else { // encrypted column
-      currentRowGroup.addColumn(chunk.descriptor.col, 
-          chunk.readAllPages(columnDecryptionSetup.getMetaDataDecryptor(), columnDecryptionSetup.getDataDecryptor(), 
+      currentRowGroup.addColumn(chunk.descriptor.col,
+          chunk.readAllPages(columnDecryptionSetup.getMetaDataDecryptor(), columnDecryptionSetup.getDataDecryptor(),
               fileDecryptor.getFileAAD(), block.getOrdinal(), columnDecryptionSetup.getOrdinal()));
     }
   }
@@ -1046,6 +1046,8 @@
   }
 
   private RowRanges getRowRanges(int blockIndex) {
+    assert FilterCompat
+        .isFilteringRequired(options.getRecordFilter()) : "Should not be invoked if filter is null or NOOP";
     RowRanges rowRanges = blockRowRanges.get(blockIndex);
     if (rowRanges == null) {
       rowRanges = ColumnIndexFilter.calculateRowRanges(options.getRecordFilter(), getColumnIndexStore(blockIndex),
@@ -1123,10 +1125,10 @@
     if (!encryptedColumn) {
       pageHeader = Util.readPageHeader(f);
     } else {
-      byte[] dictionaryPageHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPageHeader, 
+      byte[] dictionaryPageHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPageHeader,
           meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
       pageHeader = Util.readPageHeader(f, columnDecryptionSetup.getMetaDataDecryptor(), dictionaryPageHeaderAAD);
-      dictionaryPageAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPage, 
+      dictionaryPageAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPage,
           meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
       pageDecryptor = columnDecryptionSetup.getDataDecryptor();
     }
@@ -1191,9 +1193,9 @@
       InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(meta.getPath());
       if (columnDecryptionSetup.isEncrypted()) {
         bloomFilterDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
-        bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterHeader, 
+        bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterHeader,
             meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
-        bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterBitset, 
+        bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterBitset,
             meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
       }
     }
@@ -1255,11 +1257,11 @@
       InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(column.getPath());
       if (columnDecryptionSetup.isEncrypted()) {
         columnIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
-        columnIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnIndex, 
+        columnIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnIndex,
             column.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
       }
     }
-    return ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(), 
+    return ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(),
         Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD));
   }
 
@@ -1284,7 +1286,7 @@
       InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(column.getPath());
       if (columnDecryptionSetup.isEncrypted()) {
         offsetIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
-        offsetIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.OffsetIndex, 
+        offsetIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.OffsetIndex,
             column.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
       }
     }
@@ -1401,7 +1403,7 @@
       return readAllPages(null, null, null, -1, -1);
     }
 
-    public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecryptor, BlockCipher.Decryptor pageBlockDecryptor, 
+    public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecryptor, BlockCipher.Decryptor pageBlockDecryptor,
         byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal) throws IOException {
       List<DataPage> pagesInChunk = new ArrayList<DataPage>();
       DictionaryPage dictionaryPage = null;
@@ -1411,7 +1413,7 @@
       int dataPageCountReadSoFar = 0;
       byte[] dataPageHeaderAAD = null;
       if (null != headerBlockDecryptor) {
-        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, rowGroupOrdinal, 
+        dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, rowGroupOrdinal,
             columnOrdinal, getPageOrdinal(dataPageCountReadSoFar));
       }
       while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
index d2a5395..a66533d 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
@@ -391,6 +391,10 @@
     assertEquals(DATA, readUsers(FilterCompat.NOOP, false));
     assertEquals(DATA, readUsers(FilterCompat.NOOP, true));
 
+    // Column index filtering with null filter
+    assertEquals(DATA, readUsers((Filter) null, false));
+    assertEquals(DATA, readUsers((Filter) null, true));
+
     // Column index filtering turned off
     assertEquals(DATA.stream().filter(user -> user.getId() == 1234).collect(Collectors.toList()),
         readUsers(eq(longColumn("id"), 1234l), true, false));