blob: eca63322d15064a7e9d811d5141d5125642a60cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.filter2.dictionarylevel;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
import org.apache.parquet.filter2.predicate.Operators.IntColumn;
import org.apache.parquet.filter2.predicate.Operators.LongColumn;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_1_0;
import static org.apache.parquet.filter2.dictionarylevel.DictionaryFilter.canDrop;
import static org.apache.parquet.filter2.predicate.FilterApi.*;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verifyZeroInteractions;
public class DictionaryFilterTest {
private static final int nElements = 1000;
private static final Configuration conf = new Configuration();
private static Path file = new Path("target/test/TestDictionaryFilter/testParquetFile");
private static final MessageType schema = parseMessageType(
"message test { "
+ "required binary binary_field; "
+ "required binary single_value_field; "
+ "required int32 int32_field; "
+ "required int64 int64_field; "
+ "required double double_field; "
+ "required float float_field; "
+ "required int32 plain_int32_field; "
+ "required binary fallback_binary_field; "
+ "} ");
private static final String ALPHABET = "abcdefghijklmnopqrstuvwxyz";
private static final int[] intValues = new int[] {
-100, 302, 3333333, 7654321, 1234567, -2000, -77775, 0, 75, 22223,
77, 22221, -444443, 205, 12, 44444, 889, 66665, -777889, -7,
52, 33, -257, 1111, 775, 26};
private static final long[] longValues = new long[] {
-100L, 302L, 3333333L, 7654321L, 1234567L, -2000L, -77775L, 0L,
75L, 22223L, 77L, 22221L, -444443L, 205L, 12L, 44444L, 889L, 66665L,
-777889L, -7L, 52L, 33L, -257L, 1111L, 775L, 26L};
private static void writeData(SimpleGroupFactory f, ParquetWriter<Group> writer) throws IOException {
for (int i = 0; i < nElements; i++) {
int index = i % ALPHABET.length();
Group group = f.newGroup()
.append("binary_field", ALPHABET.substring(index, index+1))
.append("single_value_field", "sharp")
.append("int32_field", intValues[i % intValues.length])
.append("int64_field", longValues[i % longValues.length])
.append("double_field", toDouble(intValues[i % intValues.length]))
.append("float_field", toFloat(intValues[i % intValues.length]))
.append("plain_int32_field", i)
.append("fallback_binary_field", i < (nElements / 2) ?
ALPHABET.substring(index, index+1) : UUID.randomUUID().toString());
writer.write(group);
}
writer.close();
}
@BeforeClass
public static void prepareFile() throws IOException {
cleanup();
GroupWriteSupport.setSchema(schema, conf);
SimpleGroupFactory f = new SimpleGroupFactory(schema);
ParquetWriter<Group> writer = ExampleParquetWriter.builder(file)
.withWriterVersion(PARQUET_1_0)
.withCompressionCodec(GZIP)
.withRowGroupSize(1024*1024)
.withPageSize(1024)
.enableDictionaryEncoding()
.withDictionaryPageSize(2*1024)
.withConf(conf)
.build();
writeData(f, writer);
}
@AfterClass
public static void cleanup() throws IOException {
FileSystem fs = file.getFileSystem(conf);
if (fs.exists(file)) {
fs.delete(file, true);
}
}
List<ColumnChunkMetaData> ccmd;
ParquetFileReader reader;
DictionaryPageReadStore dictionaries;
@Before
public void setUp() throws Exception {
reader = ParquetFileReader.open(conf, file);
ParquetMetadata meta = reader.getFooter();
ccmd = meta.getBlocks().get(0).getColumns();
dictionaries = reader.getDictionaryReader(meta.getBlocks().get(0));
}
@After
public void tearDown() throws Exception {
reader.close();
}
@Test
@SuppressWarnings("deprecation")
public void testDictionaryEncodedColumns() throws Exception {
Set<String> dictionaryEncodedColumns = new HashSet<String>(Arrays.asList(
"binary_field", "single_value_field", "int32_field", "int64_field",
"double_field", "float_field"));
for (ColumnChunkMetaData column : ccmd) {
String name = column.getPath().toDotString();
if (dictionaryEncodedColumns.contains(name)) {
assertTrue("Column should be dictionary encoded: " + name,
column.getEncodings().contains(Encoding.PLAIN_DICTIONARY));
assertFalse("Column should not have plain data pages" + name,
column.getEncodings().contains(Encoding.PLAIN));
} else {
assertTrue("Column should have plain encoding: " + name,
column.getEncodings().contains(Encoding.PLAIN));
if (name.startsWith("fallback")) {
assertTrue("Column should be have some dictionary encoding: " + name,
column.getEncodings().contains(Encoding.PLAIN_DICTIONARY));
} else {
assertFalse("Column should have no dictionary encoding: " + name,
column.getEncodings().contains(Encoding.PLAIN_DICTIONARY));
}
}
}
}
@Test
public void testEqBinary() throws Exception {
BinaryColumn b = binaryColumn("binary_field");
FilterPredicate pred = eq(b, Binary.fromString("c"));
assertFalse("Should not drop block for lower case letters",
canDrop(pred, ccmd, dictionaries));
assertTrue("Should drop block for upper case letters",
canDrop(eq(b, Binary.fromString("A")), ccmd, dictionaries));
assertFalse("Should not drop block for null",
canDrop(eq(b, null), ccmd, dictionaries));
}
@Test
public void testNotEqBinary() throws Exception {
BinaryColumn sharp = binaryColumn("single_value_field");
BinaryColumn b = binaryColumn("binary_field");
assertTrue("Should drop block with only the excluded value",
canDrop(notEq(sharp, Binary.fromString("sharp")), ccmd, dictionaries));
assertFalse("Should not drop block with any other value",
canDrop(notEq(sharp, Binary.fromString("applause")), ccmd, dictionaries));
assertFalse("Should not drop block with a known value",
canDrop(notEq(b, Binary.fromString("x")), ccmd, dictionaries));
assertFalse("Should not drop block with a known value",
canDrop(notEq(b, Binary.fromString("B")), ccmd, dictionaries));
assertFalse("Should not drop block for null",
canDrop(notEq(b, null), ccmd, dictionaries));
}
@Test
public void testLtInt() throws Exception {
IntColumn i32 = intColumn("int32_field");
int lowest = Integer.MAX_VALUE;
for (int value : intValues) {
lowest = Math.min(lowest, value);
}
assertTrue("Should drop: < lowest value",
canDrop(lt(i32, lowest), ccmd, dictionaries));
assertFalse("Should not drop: < (lowest value + 1)",
canDrop(lt(i32, lowest + 1), ccmd, dictionaries));
assertFalse("Should not drop: contains matching values",
canDrop(lt(i32, Integer.MAX_VALUE), ccmd, dictionaries));
}
@Test
public void testLtEqLong() throws Exception {
LongColumn i64 = longColumn("int64_field");
long lowest = Long.MAX_VALUE;
for (long value : longValues) {
lowest = Math.min(lowest, value);
}
assertTrue("Should drop: <= lowest - 1",
canDrop(ltEq(i64, lowest - 1), ccmd, dictionaries));
assertFalse("Should not drop: <= lowest",
canDrop(ltEq(i64, lowest), ccmd, dictionaries));
assertFalse("Should not drop: contains matching values",
canDrop(ltEq(i64, Long.MAX_VALUE), ccmd, dictionaries));
}
@Test
public void testGtFloat() throws Exception {
FloatColumn f = floatColumn("float_field");
float highest = Float.MIN_VALUE;
for (int value : intValues) {
highest = Math.max(highest, toFloat(value));
}
assertTrue("Should drop: > highest value",
canDrop(gt(f, highest), ccmd, dictionaries));
assertFalse("Should not drop: > (highest value - 1.0)",
canDrop(gt(f, highest - 1.0f), ccmd, dictionaries));
assertFalse("Should not drop: contains matching values",
canDrop(gt(f, Float.MIN_VALUE), ccmd, dictionaries));
}
@Test
public void testGtEqDouble() throws Exception {
DoubleColumn d = doubleColumn("double_field");
double highest = Double.MIN_VALUE;
for (int value : intValues) {
highest = Math.max(highest, toDouble(value));
}
assertTrue("Should drop: >= highest + 0.00000001",
canDrop(gtEq(d, highest + 0.00000001), ccmd, dictionaries));
assertFalse("Should not drop: >= highest",
canDrop(gtEq(d, highest), ccmd, dictionaries));
assertFalse("Should not drop: contains matching values",
canDrop(gtEq(d, Double.MIN_VALUE), ccmd, dictionaries));
}
@Test
public void testAnd() throws Exception {
BinaryColumn col = binaryColumn("binary_field");
// both evaluate to false (no upper-case letters are in the dictionary)
FilterPredicate B = eq(col, Binary.fromString("B"));
FilterPredicate C = eq(col, Binary.fromString("C"));
// both evaluate to true (all lower-case letters are in the dictionary)
FilterPredicate x = eq(col, Binary.fromString("x"));
FilterPredicate y = eq(col, Binary.fromString("y"));
assertTrue("Should drop when either predicate must be false",
canDrop(and(B, y), ccmd, dictionaries));
assertTrue("Should drop when either predicate must be false",
canDrop(and(x, C), ccmd, dictionaries));
assertTrue("Should drop when either predicate must be false",
canDrop(and(B, C), ccmd, dictionaries));
assertFalse("Should not drop when either predicate could be true",
canDrop(and(x, y), ccmd, dictionaries));
}
@Test
public void testOr() throws Exception {
BinaryColumn col = binaryColumn("binary_field");
// both evaluate to false (no upper-case letters are in the dictionary)
FilterPredicate B = eq(col, Binary.fromString("B"));
FilterPredicate C = eq(col, Binary.fromString("C"));
// both evaluate to true (all lower-case letters are in the dictionary)
FilterPredicate x = eq(col, Binary.fromString("x"));
FilterPredicate y = eq(col, Binary.fromString("y"));
assertFalse("Should not drop when one predicate could be true",
canDrop(or(B, y), ccmd, dictionaries));
assertFalse("Should not drop when one predicate could be true",
canDrop(or(x, C), ccmd, dictionaries));
assertTrue("Should drop when both predicates must be false",
canDrop(or(B, C), ccmd, dictionaries));
assertFalse("Should not drop when one predicate could be true",
canDrop(or(x, y), ccmd, dictionaries));
}
@Test
public void testColumnWithoutDictionary() throws Exception {
IntColumn plain = intColumn("plain_int32_field");
DictionaryPageReadStore dictionaryStore = mock(DictionaryPageReadStore.class);
assertFalse("Should never drop block using plain encoding",
canDrop(eq(plain, -10), ccmd, dictionaryStore));
assertFalse("Should never drop block using plain encoding",
canDrop(lt(plain, -10), ccmd, dictionaryStore));
assertFalse("Should never drop block using plain encoding",
canDrop(ltEq(plain, -10), ccmd, dictionaryStore));
assertFalse("Should never drop block using plain encoding",
canDrop(gt(plain, nElements + 10), ccmd, dictionaryStore));
assertFalse("Should never drop block using plain encoding",
canDrop(gtEq(plain, nElements + 10), ccmd, dictionaryStore));
assertFalse("Should never drop block using plain encoding",
canDrop(notEq(plain, nElements + 10), ccmd, dictionaryStore));
verifyZeroInteractions(dictionaryStore);
}
@Test
public void testColumnWithDictionaryAndPlainEncodings() throws Exception {
IntColumn plain = intColumn("fallback_binary_field");
DictionaryPageReadStore dictionaryStore = mock(DictionaryPageReadStore.class);
assertFalse("Should never drop block using plain encoding",
canDrop(eq(plain, -10), ccmd, dictionaryStore));
assertFalse("Should never drop block using plain encoding",
canDrop(lt(plain, -10), ccmd, dictionaryStore));
assertFalse("Should never drop block using plain encoding",
canDrop(ltEq(plain, -10), ccmd, dictionaryStore));
assertFalse("Should never drop block using plain encoding",
canDrop(gt(plain, nElements + 10), ccmd, dictionaryStore));
assertFalse("Should never drop block using plain encoding",
canDrop(gtEq(plain, nElements + 10), ccmd, dictionaryStore));
assertFalse("Should never drop block using plain encoding",
canDrop(notEq(plain, nElements + 10), ccmd, dictionaryStore));
verifyZeroInteractions(dictionaryStore);
}
@Test
public void testEqMissingColumn() throws Exception {
BinaryColumn b = binaryColumn("missing_column");
assertTrue("Should drop block for non-null query",
canDrop(eq(b, Binary.fromString("any")), ccmd, dictionaries));
assertFalse("Should not drop block null query",
canDrop(eq(b, null), ccmd, dictionaries));
}
@Test
public void testNotEqMissingColumn() throws Exception {
BinaryColumn b = binaryColumn("missing_column");
assertFalse("Should not drop block for non-null query",
canDrop(notEq(b, Binary.fromString("any")), ccmd, dictionaries));
assertTrue("Should not drop block null query",
canDrop(notEq(b, null), ccmd, dictionaries));
}
@Test
public void testLtMissingColumn() throws Exception {
BinaryColumn b = binaryColumn("missing_column");
assertTrue("Should drop block for any non-null query",
canDrop(lt(b, Binary.fromString("any")), ccmd, dictionaries));
}
@Test
public void testLtEqMissingColumn() throws Exception {
BinaryColumn b = binaryColumn("missing_column");
assertTrue("Should drop block for any non-null query",
canDrop(ltEq(b, Binary.fromString("any")), ccmd, dictionaries));
}
@Test
public void testGtMissingColumn() throws Exception {
BinaryColumn b = binaryColumn("missing_column");
assertTrue("Should drop block for any non-null query",
canDrop(gt(b, Binary.fromString("any")), ccmd, dictionaries));
}
@Test
public void testGtEqMissingColumn() throws Exception {
BinaryColumn b = binaryColumn("missing_column");
assertTrue("Should drop block for any non-null query",
canDrop(gtEq(b, Binary.fromString("any")), ccmd, dictionaries));
}
private static double toDouble(int value) {
return (value * 1.0);
}
private static float toFloat(int value) {
return (float) (value * 2.0);
}
}