PARQUET-1964: Properly handle missing/null filter (#856)
diff --git a/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java b/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
index 463a54c..7437e79 100644
--- a/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
+++ b/parquet-column/src/main/java/org/apache/parquet/filter2/compat/FilterCompat.java
@@ -121,6 +121,17 @@
return NOOP;
}
+ /**
+ * Returns whether filtering is required based on the specified filter. It is used to avoid any significant steps to
+ * prepare filtering if {@link #NOOP} is used.
+ *
+ * @param filter the filter to be checked
+ * @return {@code false} if the filter is {@code null} or is a no-op filter, {@code true} otherwise.
+ */
+ public static boolean isFilteringRequired(Filter filter) {
+ return filter != null && !(filter instanceof NoOpFilter);
+ }
+
// wraps a FilterPredicate
public static final class FilterPredicateCompat implements Filter {
private final FilterPredicate filterPredicate;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
index ff34838..8ffe19f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java
@@ -86,7 +86,7 @@
*/
public InternalParquetRecordReader(ReadSupport<T> readSupport, Filter filter) {
this.readSupport = readSupport;
- this.filter = Objects.requireNonNull(filter, "filter cannot be null");
+ this.filter = filter == null ? FilterCompat.NOOP : filter;
}
/**
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 8d09b87..b13d336 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -527,7 +527,7 @@
return readFooter(file, options, f, converter);
}
- private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options,
+ private static final ParquetMetadata readFooter(InputFile file, ParquetReadOptions options,
SeekableInputStream f, ParquetMetadataConverter converter) throws IOException {
long fileLen = file.getLength();
@@ -579,7 +579,7 @@
// Regular file, or encrypted file with plaintext footer
if (!encryptedFooterMode) {
- return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, false,
+ return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, false,
fileMetadataLength);
}
@@ -588,10 +588,10 @@
throw new ParquetCryptoRuntimeException("Trying to read file with encrypted footer. No keys available");
}
FileCryptoMetaData fileCryptoMetaData = readFileCryptoMetaData(footerBytesStream);
- fileDecryptor.setFileCryptoMetaData(fileCryptoMetaData.getEncryption_algorithm(),
+ fileDecryptor.setFileCryptoMetaData(fileCryptoMetaData.getEncryption_algorithm(),
true, fileCryptoMetaData.getKey_metadata());
// footer length is required only for signed plaintext footers
- return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0);
+ return converter.readParquetMetadata(footerBytesStream, options.getMetadataFilter(), fileDecryptor, true, 0);
}
/**
@@ -827,7 +827,7 @@
}
public long getFilteredRecordCount() {
- if (!options.useColumnIndexFilter()) {
+ if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
return getRecordCount();
}
long total = 0;
@@ -867,7 +867,7 @@
}
FilterCompat.Filter recordFilter = options.getRecordFilter();
- if (recordFilter != null) {
+ if (FilterCompat.isFilteringRequired(recordFilter)) {
return RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this);
}
@@ -953,7 +953,7 @@
if (currentBlock == blocks.size()) {
return null;
}
- if (!options.useColumnIndexFilter()) {
+ if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) {
return readNextRowGroup();
}
BlockMetaData block = blocks.get(currentBlock);
@@ -1030,8 +1030,8 @@
if (!columnDecryptionSetup.isEncrypted()) { // plaintext column
currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages());
} else { // encrypted column
- currentRowGroup.addColumn(chunk.descriptor.col,
- chunk.readAllPages(columnDecryptionSetup.getMetaDataDecryptor(), columnDecryptionSetup.getDataDecryptor(),
+ currentRowGroup.addColumn(chunk.descriptor.col,
+ chunk.readAllPages(columnDecryptionSetup.getMetaDataDecryptor(), columnDecryptionSetup.getDataDecryptor(),
fileDecryptor.getFileAAD(), block.getOrdinal(), columnDecryptionSetup.getOrdinal()));
}
}
@@ -1046,6 +1046,8 @@
}
private RowRanges getRowRanges(int blockIndex) {
+ assert FilterCompat
+ .isFilteringRequired(options.getRecordFilter()) : "Should not be invoked if filter is null or NOOP";
RowRanges rowRanges = blockRowRanges.get(blockIndex);
if (rowRanges == null) {
rowRanges = ColumnIndexFilter.calculateRowRanges(options.getRecordFilter(), getColumnIndexStore(blockIndex),
@@ -1123,10 +1125,10 @@
if (!encryptedColumn) {
pageHeader = Util.readPageHeader(f);
} else {
- byte[] dictionaryPageHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPageHeader,
+ byte[] dictionaryPageHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPageHeader,
meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
pageHeader = Util.readPageHeader(f, columnDecryptionSetup.getMetaDataDecryptor(), dictionaryPageHeaderAAD);
- dictionaryPageAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPage,
+ dictionaryPageAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.DictionaryPage,
meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
pageDecryptor = columnDecryptionSetup.getDataDecryptor();
}
@@ -1191,9 +1193,9 @@
InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(meta.getPath());
if (columnDecryptionSetup.isEncrypted()) {
bloomFilterDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
- bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterHeader,
+ bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterHeader,
meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
- bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterBitset,
+ bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.BloomFilterBitset,
meta.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
}
}
@@ -1255,11 +1257,11 @@
InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(column.getPath());
if (columnDecryptionSetup.isEncrypted()) {
columnIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
- columnIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnIndex,
+ columnIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.ColumnIndex,
column.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
}
}
- return ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(),
+ return ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(),
Util.readColumnIndex(f, columnIndexDecryptor, columnIndexAAD));
}
@@ -1284,7 +1286,7 @@
InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(column.getPath());
if (columnDecryptionSetup.isEncrypted()) {
offsetIndexDecryptor = columnDecryptionSetup.getMetaDataDecryptor();
- offsetIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.OffsetIndex,
+ offsetIndexAAD = AesCipher.createModuleAAD(fileDecryptor.getFileAAD(), ModuleType.OffsetIndex,
column.getRowGroupOrdinal(), columnDecryptionSetup.getOrdinal(), -1);
}
}
@@ -1401,7 +1403,7 @@
return readAllPages(null, null, null, -1, -1);
}
- public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecryptor, BlockCipher.Decryptor pageBlockDecryptor,
+ public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecryptor, BlockCipher.Decryptor pageBlockDecryptor,
byte[] aadPrefix, int rowGroupOrdinal, int columnOrdinal) throws IOException {
List<DataPage> pagesInChunk = new ArrayList<DataPage>();
DictionaryPage dictionaryPage = null;
@@ -1411,7 +1413,7 @@
int dataPageCountReadSoFar = 0;
byte[] dataPageHeaderAAD = null;
if (null != headerBlockDecryptor) {
- dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, rowGroupOrdinal,
+ dataPageHeaderAAD = AesCipher.createModuleAAD(aadPrefix, ModuleType.DataPageHeader, rowGroupOrdinal,
columnOrdinal, getPageOrdinal(dataPageCountReadSoFar));
}
while (hasMorePages(valuesCountReadSoFar, dataPageCountReadSoFar)) {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
index d2a5395..a66533d 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java
@@ -391,6 +391,10 @@
assertEquals(DATA, readUsers(FilterCompat.NOOP, false));
assertEquals(DATA, readUsers(FilterCompat.NOOP, true));
+ // Column index filtering with null filter
+ assertEquals(DATA, readUsers((Filter) null, false));
+ assertEquals(DATA, readUsers((Filter) null, true));
+
// Column index filtering turned off
assertEquals(DATA.stream().filter(user -> user.getId() == 1234).collect(Collectors.toList()),
readUsers(eq(longColumn("id"), 1234l), true, false));