blob: a66533d0f962d0445d3434163da5369fb2547e5e [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.parquet.hadoop;
import static java.util.Collections.emptyList;
import static;
import static org.apache.parquet.filter2.predicate.FilterApi.and;
import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
import static org.apache.parquet.filter2.predicate.FilterApi.eq;
import static org.apache.parquet.filter2.predicate.FilterApi.gtEq;
import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
import static;
import static org.apache.parquet.filter2.predicate.FilterApi.ltEq;
import static org.apache.parquet.filter2.predicate.FilterApi.not;
import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
import static org.apache.parquet.filter2.predicate.FilterApi.or;
import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.parquet.schema.Types.optional;
import static org.apache.parquet.schema.Types.required;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Predicate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.crypto.ColumnEncryptionProperties;
import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.Statistics;
import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.Location;
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.PhoneNumber;
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter.User;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Unit tests for high level column index based filtering.
public class TestColumnIndexFiltering {
private static final Logger LOGGER = LoggerFactory.getLogger(TestColumnIndexFiltering.class);
private static final Random RANDOM = new Random(42);
private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
private static final List<User> DATA = Collections.unmodifiableList(generateData(10000));
private static final Path FILE_V1 = createTempFile(false);
private static final Path FILE_V2 = createTempFile(false);
private static final Path FILE_V1_E = createTempFile(true);
private static final Path FILE_V2_E = createTempFile(true);
private static final MessageType SCHEMA_WITHOUT_NAME = Types.buildMessage()
private static final byte[] FOOTER_ENCRYPTION_KEY = "0123456789012345".getBytes();
private static final byte[] COLUMN_ENCRYPTION_KEY1 = "1234567890123450".getBytes();
private static final byte[] COLUMN_ENCRYPTION_KEY2 = "1234567890123451".getBytes();
private static final String FOOTER_ENCRYPTION_KEY_ID = "kf";
private static final String COLUMN_ENCRYPTION_KEY1_ID = "kc1";
private static final String COLUMN_ENCRYPTION_KEY2_ID = "kc2";
@Parameters(name = "Run {index}: isEncrypted={1}")
public static Collection<Object[]> params() {
return Arrays.asList(
new Object[] { FILE_V1, false /*isEncrypted*/ },
new Object[] { FILE_V2, false /*isEncrypted*/ },
new Object[] { FILE_V1_E, true /*isEncrypted*/ },
new Object[] { FILE_V2_E, true /*isEncrypted*/ });
private final Path file;
private final boolean isEncrypted;
public TestColumnIndexFiltering(Path file, boolean isEncrypted) {
this.file = file;
this.isEncrypted = isEncrypted;
private static List<User> generateData(int rowCount) {
List<User> users = new ArrayList<>();
List<String> names = generateNames(rowCount);
for (int i = 0; i < rowCount; ++i) {
users.add(new User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
return users;
private static List<String> generateNames(int rowCount) {
List<String> list = new ArrayList<>();
// Adding fix values for filtering
int nullCount = rowCount / 100;
String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
int maxLength = 8;
for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
int l = RANDOM.nextInt(maxLength);
StringBuilder builder = new StringBuilder(l);
for (int j = 0; j < l; ++j) {
Collections.sort(list, (str1, str2) -> -str1.compareTo(str2));
// Adding nulls to random places
for (int i = 0; i < nullCount; ++i) {
list.add(RANDOM.nextInt(list.size()), null);
return list;
private static List<PhoneNumber> generatePhoneNumbers() {
int length = RANDOM.nextInt(5) - 1;
if (length < 0) {
return null;
List<PhoneNumber> phoneNumbers = new ArrayList<>(length);
for (int i = 0; i < length; ++i) {
// 6 digits numbers
long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
phoneNumbers.add(new PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
return phoneNumbers;
private static Location generateLocation(int id, int rowCount) {
if (RANDOM.nextDouble() < 0.01) {
return null;
double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
return new Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
private static Path createTempFile(boolean encrypted) {
String suffix = encrypted ? ".parquet.encrypted" : ".parquet";
try {
return new Path(Files.createTempFile("test-ci_", suffix).toAbsolutePath().toString());
} catch (IOException e) {
throw new AssertionError("Unable to create temporary file", e);
private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering) throws IOException {
return readUsers(FilterCompat.get(filter), useOtherFiltering, true);
private List<User> readUsers(FilterPredicate filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
throws IOException {
return readUsers(FilterCompat.get(filter), useOtherFiltering, useColumnIndexFilter);
private List<User> readUsers(Filter filter, boolean useOtherFiltering) throws IOException {
return readUsers(filter, useOtherFiltering, true);
private List<User> readUsers(Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter)
throws IOException {
FileDecryptionProperties decryptionProperties = getFileDecryptionProperties();
return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
private List<User> readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering,
boolean useColumnIndexFilter) throws IOException {
FileDecryptionProperties decryptionProperties = getFileDecryptionProperties();
return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
.set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()));
private FileDecryptionProperties getFileDecryptionProperties() {
FileDecryptionProperties decryptionProperties = null;
if (isEncrypted) {
DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock()
decryptionProperties = FileDecryptionProperties.builder()
return decryptionProperties;
// Assumes that both lists are in the same order
private static void assertContains(Stream<User> expected, List<User> actual) {
Iterator<User> expIt = expected.iterator();
if (!expIt.hasNext()) {
User exp =;
for (User act : actual) {
if (act.equals(exp)) {
if (!expIt.hasNext()) {
exp =;
assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
private void assertCorrectFiltering(Predicate<User> expectedFilter, FilterPredicate actualFilter)
throws IOException {
// Check with only column index based filtering
List<User> result = readUsers(actualFilter, false);
assertTrue("Column-index filtering should drop some pages", result.size() < DATA.size());"{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
100 * result.size() / DATA.size());
// Asserts that all the required records are in the result
assertContains(, result);
// Asserts that all the retrieved records are in the file (validating non-matching records)
assertContains(, DATA);
// Check with all the filtering filtering to ensure the result contains exactly the required values
result = readUsers(actualFilter, true);
assertEquals(, result);
public static void createFiles() throws IOException {
writePhoneBookToFile(FILE_V1, WriterVersion.PARQUET_1_0, null);
writePhoneBookToFile(FILE_V2, WriterVersion.PARQUET_2_0, null);
FileEncryptionProperties encryptionProperties = getFileEncryptionProperties();
writePhoneBookToFile(FILE_V1_E, ParquetProperties.WriterVersion.PARQUET_1_0, encryptionProperties);
writePhoneBookToFile(FILE_V2_E, ParquetProperties.WriterVersion.PARQUET_2_0, encryptionProperties);
private static void writePhoneBookToFile(Path file, WriterVersion parquetVersion,
FileEncryptionProperties encryptionProperties) throws IOException {
int pageSize = DATA.size() / 10; // Ensure that several pages will be created
int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created
private static FileEncryptionProperties getFileEncryptionProperties() {
ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
return encryptionProperties;
private static void deleteFile(Path file) throws IOException {
file.getFileSystem(new Configuration()).delete(file, false);
public static void deleteFiles() throws IOException {
public void testSimpleFiltering() throws IOException {
record -> record.getId() == 1234,
eq(longColumn("id"), 1234l));
record -> "miller".equals(record.getName()),
eq(binaryColumn("name"), Binary.fromString("miller")));
record -> record.getName() == null,
eq(binaryColumn("name"), null));
public void testNoFiltering() throws IOException {
// Column index filtering with no-op filter
assertEquals(DATA, readUsers(FilterCompat.NOOP, false));
assertEquals(DATA, readUsers(FilterCompat.NOOP, true));
// Column index filtering with null filter
assertEquals(DATA, readUsers((Filter) null, false));
assertEquals(DATA, readUsers((Filter) null, true));
// Column index filtering turned off
assertEquals( -> user.getId() == 1234).collect(Collectors.toList()),
readUsers(eq(longColumn("id"), 1234l), true, false));
assertEquals( -> "miller".equals(user.getName())).collect(Collectors.toList()),
readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), true, false));
assertEquals( -> user.getName() == null).collect(Collectors.toList()),
readUsers(eq(binaryColumn("name"), null), true, false));
// Every filtering mechanism turned off
assertEquals(DATA, readUsers(eq(longColumn("id"), 1234l), false, false));
assertEquals(DATA, readUsers(eq(binaryColumn("name"), Binary.fromString("miller")), false, false));
assertEquals(DATA, readUsers(eq(binaryColumn("name"), null), false, false));
public void testComplexFiltering() throws IOException {
record -> {
Location loc = record.getLocation();
Double lat = loc == null ? null : loc.getLat();
Double lon = loc == null ? null : loc.getLon();
return lat != null && lon != null && 37 <= lat && lat <= 70 && -21 <= lon && lon <= 35;
and(and(gtEq(doubleColumn(""), 37.0), ltEq(doubleColumn(""), 70.0)),
and(gtEq(doubleColumn("location.lon"), -21.0), ltEq(doubleColumn("location.lon"), 35.0))));
record -> {
Location loc = record.getLocation();
return loc == null || (loc.getLat() == null && loc.getLon() == null);
and(eq(doubleColumn(""), null), eq(doubleColumn("location.lon"), null)));
record -> {
String name = record.getName();
return name != null && name.compareTo("thomas") < 0 && record.getId() <= 3 * DATA.size() / 4;
and(lt(binaryColumn("name"), Binary.fromString("thomas")), ltEq(longColumn("id"), 3l * DATA.size() / 4)));
public static class NameStartsWithVowel extends UserDefinedPredicate<Binary> {
private static final Binary A = Binary.fromString("a");
private static final Binary B = Binary.fromString("b");
private static final Binary E = Binary.fromString("e");
private static final Binary F = Binary.fromString("f");
private static final Binary I = Binary.fromString("i");
private static final Binary J = Binary.fromString("j");
private static final Binary O = Binary.fromString("o");
private static final Binary P = Binary.fromString("p");
private static final Binary U = Binary.fromString("u");
private static final Binary V = Binary.fromString("v");
private static boolean isStartingWithVowel(String str) {
if (str == null || str.isEmpty()) {
return false;
switch (str.charAt(0)) {
case 'a':
case 'e':
case 'i':
case 'o':
case 'u':
return true;
return false;
public boolean keep(Binary value) {
return value != null && isStartingWithVowel(value.toStringUsingUTF8());
public boolean canDrop(Statistics<Binary> statistics) {
Comparator<Binary> cmp = statistics.getComparator();
Binary min = statistics.getMin();
Binary max = statistics.getMax();
return, A) < 0
|| (, B) >= 0 &&, E) < 0)
|| (, F) >= 0 &&, I) < 0)
|| (, J) >= 0 &&, O) < 0)
|| (, P) >= 0 &&, U) < 0)
||, V) >= 0;
public boolean inverseCanDrop(Statistics<Binary> statistics) {
Comparator<Binary> cmp = statistics.getComparator();
Binary min = statistics.getMin();
Binary max = statistics.getMax();
return (, A) >= 0 &&, B) < 0)
|| (, E) >= 0 &&, F) < 0)
|| (, I) >= 0 &&, J) < 0)
|| (, O) >= 0 &&, P) < 0)
|| (, U) >= 0 &&, V) < 0);
public static class IsDivisibleBy extends UserDefinedPredicate<Long> implements Serializable {
private long divisor;
IsDivisibleBy(long divisor) {
this.divisor = divisor;
public boolean keep(Long value) {
// Deliberately not checking for null to verify the handling of NPE
// Implementors shall always checks the value for null and return accordingly
return value % divisor == 0;
public boolean canDrop(Statistics<Long> statistics) {
long min = statistics.getMin();
long max = statistics.getMax();
return min % divisor != 0 && max % divisor != 0 && min / divisor == max / divisor;
public boolean inverseCanDrop(Statistics<Long> statistics) {
long min = statistics.getMin();
long max = statistics.getMax();
return min == max && min % divisor == 0;
public void testUDF() throws IOException {
record -> NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0,
or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
userDefined(longColumn("id"), new IsDivisibleBy(234))));
record -> !(NameStartsWithVowel.isStartingWithVowel(record.getName()) || record.getId() % 234 == 0),
not(or(userDefined(binaryColumn("name"), NameStartsWithVowel.class),
userDefined(longColumn("id"), new IsDivisibleBy(234)))));
public void testFilteringWithMissingColumns() throws IOException {
// Missing column filter is always true
assertEquals(DATA, readUsers(notEq(binaryColumn("not-existing-binary"), Binary.EMPTY), true));
record -> record.getId() == 1234,
and(eq(longColumn("id"), 1234l),
eq(longColumn("not-existing-long"), null)));
record -> "miller".equals(record.getName()),
and(eq(binaryColumn("name"), Binary.fromString("miller")),
invert(userDefined(binaryColumn("not-existing-binary"), NameStartsWithVowel.class))));
// Missing column filter is always false
assertEquals(emptyList(), readUsers(lt(longColumn("not-existing-long"), 0l), true));
record -> "miller".equals(record.getName()),
or(eq(binaryColumn("name"), Binary.fromString("miller")),
gtEq(binaryColumn("not-existing-binary"), Binary.EMPTY)));
record -> record.getId() == 1234,
or(eq(longColumn("id"), 1234l),
userDefined(longColumn("not-existing-long"), new IsDivisibleBy(1))));
public void testFilteringWithProjection() throws IOException {
// All rows shall be retrieved because all values in column 'name' shall be handled as null values
assertEquals( -> user.cloneWithName(null)).collect(toList()),
readUsersWithProjection(FilterCompat.get(eq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, true, true));
// Column index filter shall drop all pages because all values in column 'name' shall be handled as null values
readUsersWithProjection(FilterCompat.get(notEq(binaryColumn("name"), null)), SCHEMA_WITHOUT_NAME, false, true));
readUsersWithProjection(FilterCompat.get(userDefined(binaryColumn("name"), NameStartsWithVowel.class)),
SCHEMA_WITHOUT_NAME, false, true));