blob: e4cc55014e24475795d3e19adb705d7c0100e340 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.crypto.ColumnEncryptionProperties;
import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.TestColumnIndexEncryption.StringKeyIdRetriever;
import org.apache.parquet.io.api.Binary;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.parquet.filter2.predicate.FilterApi.*;
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
import static org.junit.Assert.*;
@RunWith(Parameterized.class)
public class TestBloomEncryption {
private static final Path FILE_V1 = createTempFile();
private static final Path FILE_V2 = createTempFile();
private static final Logger LOGGER = LoggerFactory.getLogger(TestBloomEncryption.class);
private static final Random RANDOM = new Random(42);
private static final String[] PHONE_KINDS = { null, "mobile", "home", "work" };
private static final List<PhoneBookWriter.User> DATA = Collections.unmodifiableList(generateData(10000));
private static final byte[] FOOTER_ENCRYPTION_KEY = new String("0123456789012345").getBytes();
private static final byte[] COLUMN_ENCRYPTION_KEY1 = new String("1234567890123450").getBytes();
private static final byte[] COLUMN_ENCRYPTION_KEY2 = new String("1234567890123451").getBytes();
private final Path file;
public TestBloomEncryption(Path file) {
this.file = file;
}
private static Path createTempFile() {
try {
return new Path(Files.createTempFile("test-bloom-filter_", ".parquet").toAbsolutePath().toString());
} catch (IOException e) {
throw new AssertionError("Unable to create temporary file", e);
}
}
@Parameterized.Parameters
public static Collection<Object[]> params() {
return Arrays.asList(new Object[] { FILE_V1 }, new Object[] { FILE_V2 });
}
private static List<PhoneBookWriter.User> generateData(int rowCount) {
List<PhoneBookWriter.User> users = new ArrayList<>();
List<String> names = generateNames(rowCount);
for (int i = 0; i < rowCount; ++i) {
users.add(new PhoneBookWriter.User(i, names.get(i), generatePhoneNumbers(), generateLocation(i, rowCount)));
}
return users;
}
private static List<String> generateNames(int rowCount) {
List<String> list = new ArrayList<>();
// Adding fix values for filtering
list.add("anderson");
list.add("anderson");
list.add("miller");
list.add("miller");
list.add("miller");
list.add("thomas");
list.add("thomas");
list.add("williams");
int nullCount = rowCount / 100;
String alphabet = "aabcdeefghiijklmnoopqrstuuvwxyz";
int maxLength = 8;
for (int i = rowCount - list.size() - nullCount; i >= 0; --i) {
int l = RANDOM.nextInt(maxLength);
StringBuilder builder = new StringBuilder(l);
for (int j = 0; j < l; ++j) {
builder.append(alphabet.charAt(RANDOM.nextInt(alphabet.length())));
}
list.add(builder.toString());
}
list.sort((str1, str2) -> -str1.compareTo(str2));
// Adding nulls to random places
for (int i = 0; i < nullCount; ++i) {
list.add(RANDOM.nextInt(list.size()), null);
}
return list;
}
private static List<PhoneBookWriter.PhoneNumber> generatePhoneNumbers() {
int length = RANDOM.nextInt(5) - 1;
if (length < 0) {
return null;
}
List<PhoneBookWriter.PhoneNumber> phoneNumbers = new ArrayList<>(length);
for (int i = 0; i < length; ++i) {
// 6 digits numbers
long number = Math.abs(RANDOM.nextLong() % 900000) + 100000;
phoneNumbers.add(new PhoneBookWriter.PhoneNumber(number, PHONE_KINDS[RANDOM.nextInt(PHONE_KINDS.length)]));
}
return phoneNumbers;
}
private static PhoneBookWriter.Location generateLocation(int id, int rowCount) {
if (RANDOM.nextDouble() < 0.01) {
return null;
}
if (RANDOM.nextDouble() < 0.001) {
return new PhoneBookWriter.Location(99.9, 99.9);
}
double lat = RANDOM.nextDouble() * 90.0 - (id < rowCount / 2 ? 90.0 : 0.0);
double lon = RANDOM.nextDouble() * 90.0 - (id < rowCount / 4 || id >= 3 * rowCount / 4 ? 90.0 : 0.0);
return new PhoneBookWriter.Location(RANDOM.nextDouble() < 0.01 ? null : lat, RANDOM.nextDouble() < 0.01 ? null : lon);
}
private List<PhoneBookWriter.User> readUsers(FilterPredicate filter, boolean useOtherFiltering,
boolean useBloomFilter) throws IOException {
/*
byte[] keyBytes = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
.withFooterKey(keyBytes)
.build();
*/
StringKeyIdRetriever kr1 = new StringKeyIdRetriever();
kr1.putKey("kf", FOOTER_ENCRYPTION_KEY);
kr1.putKey("kc1", COLUMN_ENCRYPTION_KEY1);
kr1.putKey("kc2", COLUMN_ENCRYPTION_KEY2);
FileDecryptionProperties fileDecryptionProperties = FileDecryptionProperties.builder()
.withKeyRetriever(kr1)
.build();
return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
.withFilter(FilterCompat.get(filter))
.withDecryption(fileDecryptionProperties)
.useDictionaryFilter(useOtherFiltering)
.useStatsFilter(useOtherFiltering)
.useRecordFilter(useOtherFiltering)
.useBloomFilter(useBloomFilter)
.useColumnIndexFilter(useOtherFiltering));
}
// Assumes that both lists are in the same order
private static void assertContains(Stream<PhoneBookWriter.User> expected, List<PhoneBookWriter.User> actual) {
Iterator<PhoneBookWriter.User> expIt = expected.iterator();
if (!expIt.hasNext()) {
return;
}
PhoneBookWriter.User exp = expIt.next();
for (PhoneBookWriter.User act : actual) {
if (act.equals(exp)) {
if (!expIt.hasNext()) {
break;
}
exp = expIt.next();
}
}
assertFalse("Not all expected elements are in the actual list. E.g.: " + exp, expIt.hasNext());
}
private void assertCorrectFiltering(Predicate<PhoneBookWriter.User> expectedFilter, FilterPredicate actualFilter)
throws IOException {
// Check with only bloom filter based filtering
List<PhoneBookWriter.User> result = readUsers(actualFilter, false, true);
assertTrue("Bloom filtering should drop some row groups", result.size() < DATA.size());
LOGGER.info("{}/{} records read; filtering ratio: {}%", result.size(), DATA.size(),
100 * result.size() / DATA.size());
// Asserts that all the required records are in the result
assertContains(DATA.stream().filter(expectedFilter), result);
// Asserts that all the retrieved records are in the file (validating non-matching records)
assertContains(result.stream(), DATA);
// Check with all the filtering filtering to ensure the result contains exactly the required values
result = readUsers(actualFilter, true, false);
assertEquals(DATA.stream().filter(expectedFilter).collect(Collectors.toList()), result);
}
@BeforeClass
public static void createFile() throws IOException {
int pageSize = DATA.size() / 100; // Ensure that several pages will be created
int rowGroupSize = pageSize * 4; // Ensure that there are more row-groups created
/*
byte[] keyBytes = {0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15};
FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(keyBytes)
.build();
*/
// Encryption configuration 2: Encrypt two columns and the footer, with different keys.
ColumnEncryptionProperties columnProperties1 = ColumnEncryptionProperties
.builder("id")
.withKey(COLUMN_ENCRYPTION_KEY1)
.withKeyID("kc1")
.build();
ColumnEncryptionProperties columnProperties2 = ColumnEncryptionProperties
.builder("name")
.withKey(COLUMN_ENCRYPTION_KEY2)
.withKeyID("kc2")
.build();
Map<ColumnPath, ColumnEncryptionProperties> columnPropertiesMap = new HashMap<>();
columnPropertiesMap.put(columnProperties1.getPath(), columnProperties1);
columnPropertiesMap.put(columnProperties2.getPath(), columnProperties2);
FileEncryptionProperties encryptionProperties = FileEncryptionProperties.builder(FOOTER_ENCRYPTION_KEY)
.withFooterKeyID("kf")
.withEncryptedColumns(columnPropertiesMap)
.build();
PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V1)
.withWriteMode(OVERWRITE)
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withBloomFilterNDV("location.lat", 10000L)
.withBloomFilterNDV("name", 10000L)
.withBloomFilterNDV("id", 10000L)
.withEncryption(encryptionProperties)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0),
DATA);
PhoneBookWriter.write(ExampleParquetWriter.builder(FILE_V2)
.withWriteMode(OVERWRITE)
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withBloomFilterNDV("location.lat", 10000L)
.withBloomFilterNDV("name", 10000L)
.withBloomFilterNDV("id", 10000L)
.withEncryption(encryptionProperties)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0),
DATA);
}
@AfterClass
public static void deleteFile() throws IOException {
FILE_V1.getFileSystem(new Configuration()).delete(FILE_V1, false);
FILE_V2.getFileSystem(new Configuration()).delete(FILE_V2, false);
}
@Test
public void testSimpleFiltering() throws IOException {
assertCorrectFiltering(
record -> record.getId() == 1234L,
eq(longColumn("id"), 1234L));
assertCorrectFiltering(
record -> "miller".equals(record.getName()),
eq(binaryColumn("name"), Binary.fromString("miller")));
}
@Test
public void testNestedFiltering() throws IOException {
assertCorrectFiltering(
record -> {
PhoneBookWriter.Location location = record.getLocation();
return location != null && location.getLat() != null && location.getLat() == 99.9;
},
eq(doubleColumn("location.lat"), 99.9));
}
}