Merge remote-tracking branch 'apache/master' into bloom-filter
Conflicts:
parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
pom.xml
diff --git a/dev/travis-before_install-bloom-filter.sh b/dev/travis-before_install-bloom-filter.sh
new file mode 100644
index 0000000..84d996a
--- /dev/null
+++ b/dev/travis-before_install-bloom-filter.sh
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+################################################################################
+# This is a branch-specific script that gets invoked at the end of
+# travis-before_install.sh. It is run for the bloom-filter branch only.
+################################################################################
+
+git clone https://github.com/apache/parquet-format.git
+cd parquet-format
+mvn install -DskipTests -q
+cd ..
+rm -rf parquet-format
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index b2369b7..322fa28 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -58,7 +58,11 @@
<artifactId>fastutil</artifactId>
<version>${fastutil.version}</version>
</dependency>
-
+ <dependency>
+ <groupId>net.openhft</groupId>
+ <artifactId>zero-allocation-hashing</artifactId>
+ <version>0.9</version>
+ </dependency>
<dependency>
<groupId>com.carrotsearch</groupId>
<artifactId>junit-benchmarks</artifactId>
@@ -87,7 +91,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
- <scope>test</scope>
</dependency>
</dependencies>
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index c022b72..ad09223 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,12 +29,18 @@
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.schema.MessageType;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
/**
* This class represents all the configurable Parquet properties.
*/
@@ -50,6 +56,7 @@
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
+ public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
@@ -90,13 +97,20 @@
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
private final int statisticsTruncateLength;
+
+ // The key-value pair represents the column name and its expected distinct number of values in a row group.
+ private final Map<String, Long> bloomFilterExpectedDistinctNumbers;
+ private final int maxBloomFilterBytes;
+ private final Set<String> bloomFilterColumns;
private final int pageRowCountLimit;
private final boolean pageWriteChecksumEnabled;
private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
- boolean pageWriteChecksumEnabled, int statisticsTruncateLength) {
+ boolean pageWriteChecksumEnabled, int statisticsTruncateLength,
+ Map<String, Long> bloomFilterExpectedDistinctNumber, Set<String> bloomFilterColumns,
+ int maxBloomFilterBytes) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -111,6 +125,9 @@
this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
this.statisticsTruncateLength = statisticsTruncateLength;
+ this.bloomFilterExpectedDistinctNumbers = bloomFilterExpectedDistinctNumber;
+ this.bloomFilterColumns = bloomFilterColumns;
+ this.maxBloomFilterBytes = maxBloomFilterBytes;
this.pageRowCountLimit = pageRowCountLimit;
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
}
@@ -176,10 +193,23 @@
public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore) {
switch (writerVersion) {
+ case PARQUET_1_0:
+ return new ColumnWriteStoreV1(schema, pageStore, this);
+ case PARQUET_2_0:
+ return new ColumnWriteStoreV2(schema, pageStore, this);
+ default:
+ throw new IllegalArgumentException("unknown version " + writerVersion);
+ }
+ }
+
+ public ColumnWriteStore newColumnWriteStore(MessageType schema,
+ PageWriteStore pageStore,
+ BloomFilterWriteStore bloomFilterWriteStore) {
+ switch (writerVersion) {
case PARQUET_1_0:
- return new ColumnWriteStoreV1(schema, pageStore, this);
+ return new ColumnWriteStoreV1(schema, pageStore, bloomFilterWriteStore, this);
case PARQUET_2_0:
- return new ColumnWriteStoreV2(schema, pageStore, this);
+ return new ColumnWriteStoreV2(schema, pageStore, bloomFilterWriteStore, this);
default:
throw new IllegalArgumentException("unknown version " + writerVersion);
}
@@ -217,6 +247,18 @@
return pageWriteChecksumEnabled;
}
+ public Map<String, Long> getBloomFilterColumnExpectedNDVs() {
+ return bloomFilterExpectedDistinctNumbers;
+ }
+
+ public Set<String> getBloomFilterColumns() {
+ return bloomFilterColumns;
+ }
+
+ public int getMaxBloomFilterBytes() {
+ return maxBloomFilterBytes;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -237,6 +279,9 @@
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+ private Map<String, Long> bloomFilterColumnExpectedNDVs = new HashMap<>();
+ private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
+ private Set<String> bloomFilterColumns = new HashSet<>();
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
@@ -255,6 +300,9 @@
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
+ this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers;
+ this.bloomFilterColumns = toCopy.bloomFilterColumns;
+ this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
}
/**
@@ -349,6 +397,39 @@
return this;
}
+ /**
+ * Set max Bloom filter bytes for related columns.
+ *
+ * @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column.
+ * @return this builder for method chaining
+ */
+ public Builder withMaxBloomFilterBytes(int maxBloomFilterBytes) {
+ this.maxBloomFilterBytes = maxBloomFilterBytes;
+ return this;
+ }
+
+ /**
+ * Set Bloom filter column names.
+ *
+ * @param columns the columns which has bloom filter enabled.
+ * @return this builder for method chaining
+ */
+ public Builder withBloomFilterColumnNames(Set<String> columns) {
+ this.bloomFilterColumns = columns;
+ return this;
+ }
+
+ /**
+ * Set expected columns distinct number for Bloom filter.
+ *
+ * @param columnExpectedNDVs the columns expected number of distinct values in a row group
+ * @return this builder for method chaining
+ */
+ public Builder withBloomFilterColumnNdvs(Map<String, Long> columnExpectedNDVs) {
+ this.bloomFilterColumnExpectedNDVs = columnExpectedNDVs;
+ return this;
+ }
+
public Builder withPageRowCountLimit(int rowCount) {
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
pageRowCountLimit = rowCount;
@@ -365,7 +446,8 @@
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
- pageRowCountLimit, pageWriteChecksumEnabled, statisticsTruncateLength);
+ pageRowCountLimit, pageWriteChecksumEnabled, statisticsTruncateLength,
+ bloomFilterColumnExpectedNDVs, bloomFilterColumns, maxBloomFilterBytes);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index 2670c31..5a8fe38 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -34,6 +34,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;
/**
@@ -74,7 +76,7 @@
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
ColumnWriterBase column = columns.get(path);
if (column == null) {
- column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props);
+ column = createColumnWriter(path, pageWriteStore.getPageWriter(path), null, props);
columns.put(path, column);
}
return column;
@@ -91,7 +93,7 @@
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
- mcolumns.put(path, createColumnWriter(path, pageWriter, props));
+ mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
}
this.columns = unmodifiableMap(mcolumns);
@@ -105,7 +107,38 @@
};
}
- abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props);
+ // The Bloom filter is written to a specified bitset instead of pages, so it needs a separate write store abstract.
+ ColumnWriteStoreBase(
+ MessageType schema,
+ PageWriteStore pageWriteStore,
+ BloomFilterWriteStore bloomFilterWriteStore,
+ ParquetProperties props) {
+ this.props = props;
+ this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
+ Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
+ for (ColumnDescriptor path : schema.getColumns()) {
+ PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+ if (props.getBloomFilterColumnExpectedNDVs() != null) {
+ BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path);
+ mcolumns.put(path, createColumnWriter(path, pageWriter, bloomFilterWriter, props));
+ } else {
+ mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
+ }
+ }
+ this.columns = unmodifiableMap(mcolumns);
+
+ this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+
+ columnWriterProvider = new ColumnWriterProvider() {
+ @Override
+ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+ return columns.get(path);
+ }
+ };
+ }
+
+ abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+ BloomFilterWriter bloomFilterWriter, ParquetProperties props);
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
return columnWriterProvider.getColumnWriter(path);
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
index 7258423..4f4a971 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
@@ -22,10 +22,11 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;
public class ColumnWriteStoreV1 extends ColumnWriteStoreBase {
-
public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore, ParquetProperties props) {
super(schema, pageWriteStore, props);
}
@@ -36,8 +37,15 @@
super(pageWriteStore, props);
}
+ public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore,
+ BloomFilterWriteStore bloomFilterWriteStore,
+ ParquetProperties props) {
+ super (schema, pageWriteStore, bloomFilterWriteStore, props);
+ }
+
@Override
- ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
- return new ColumnWriterV1(path, pageWriter, props);
+ ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+ BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+ return new ColumnWriterV1(path, pageWriter, bloomFilterWriter, props);
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
index bf1090d..590c3ed 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -22,6 +22,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.schema.MessageType;
public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
@@ -30,8 +32,15 @@
super(schema, pageWriteStore, props);
}
+ public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore,
+ BloomFilterWriteStore bloomFilterWriteStore,
+ ParquetProperties props) {
+ super(schema, pageWriteStore, bloomFilterWriteStore, props);
+ }
+
@Override
- ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
- return new ColumnWriterV2(path, pageWriter, props);
+ ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+ BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+ return new ColumnWriterV2(path, pageWriter, bloomFilterWriter, props);
}
}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 8fc7d31..b3b799a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -19,6 +19,8 @@
package org.apache.parquet.column.impl;
import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
@@ -27,6 +29,9 @@
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.api.Binary;
import org.slf4j.Logger;
@@ -53,6 +58,9 @@
private long rowsWrittenSoFar = 0;
private int pageRowCount;
+ private BloomFilterWriter bloomFilterWriter;
+ private BloomFilter bloomFilter;
+
ColumnWriterBase(
ColumnDescriptor path,
PageWriter pageWriter,
@@ -66,6 +74,42 @@
this.dataColumn = props.newValuesWriter(path);
}
+ ColumnWriterBase(
+ ColumnDescriptor path,
+ PageWriter pageWriter,
+ BloomFilterWriter bloomFilterWriter,
+ ParquetProperties props
+ ) {
+ this(path, pageWriter, props);
+
+ // Bloom filters don't support nested columns yet; see PARQUET-1453.
+ if (path.getPath().length != 1 || bloomFilterWriter == null) {
+ return;
+ }
+ String column = path.getPath()[0];
+
+ this.bloomFilterWriter = bloomFilterWriter;
+ Set<String> bloomFilterColumns = props.getBloomFilterColumns();
+ if (!bloomFilterColumns.contains(column)) {
+ return;
+ }
+ int maxBloomFilterSize = props.getMaxBloomFilterBytes();
+
+ Map<String, Long> bloomFilterColumnExpectedNDVs = props.getBloomFilterColumnExpectedNDVs();
+ if (bloomFilterColumnExpectedNDVs.size() > 0) {
+ // If user specify the column NDV, we construct Bloom filter from it.
+ if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) {
+ int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(),
+ BlockSplitBloomFilter.DEFAULT_FPP);
+
+ this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize);
+ }
+ }
+ else {
+ this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize);
+ }
+ }
+
abstract ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path);
abstract ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path);
@@ -122,6 +166,36 @@
+ pageWriter.getMemSize();
}
+ private void updateBloomFilter(int value) {
+ if (bloomFilter != null) {
+ bloomFilter.insertHash(bloomFilter.hash(value));
+ }
+ }
+
+ private void updateBloomFilter(long value) {
+ if (bloomFilter != null) {
+ bloomFilter.insertHash(bloomFilter.hash(value));
+ }
+ }
+
+ private void updateBloomFilter(double value) {
+ if (bloomFilter != null) {
+ bloomFilter.insertHash(bloomFilter.hash(value));
+ }
+ }
+
+ private void updateBloomFilter(float value) {
+ if (bloomFilter != null) {
+ bloomFilter.insertHash(bloomFilter.hash(value));
+ }
+ }
+
+ private void updateBloomFilter(Binary value) {
+ if (bloomFilter != null) {
+ bloomFilter.insertHash(bloomFilter.hash(value));
+ }
+ }
+
/**
* Writes the current value
*
@@ -137,6 +211,7 @@
definitionLevel(definitionLevel);
dataColumn.writeDouble(value);
statistics.updateStats(value);
+ updateBloomFilter(value);
++valueCount;
}
@@ -155,6 +230,7 @@
definitionLevel(definitionLevel);
dataColumn.writeFloat(value);
statistics.updateStats(value);
+ updateBloomFilter(value);
++valueCount;
}
@@ -173,6 +249,7 @@
definitionLevel(definitionLevel);
dataColumn.writeBytes(value);
statistics.updateStats(value);
+ updateBloomFilter(value);
++valueCount;
}
@@ -209,6 +286,7 @@
definitionLevel(definitionLevel);
dataColumn.writeInteger(value);
statistics.updateStats(value);
+ updateBloomFilter(value);
++valueCount;
}
@@ -227,6 +305,7 @@
definitionLevel(definitionLevel);
dataColumn.writeLong(value);
statistics.updateStats(value);
+ updateBloomFilter(value);
++valueCount;
}
@@ -246,6 +325,10 @@
}
dataColumn.resetDictionary();
}
+
+ if (bloomFilterWriter != null && bloomFilter != null) {
+ bloomFilterWriter.writeBloomFilter(bloomFilter);
+ }
}
/**
@@ -265,20 +348,24 @@
* @return the number of bytes of memory used to buffer the current data and the previously written pages
*/
long getTotalBufferedSize() {
+ long bloomBufferSize = bloomFilter == null ? 0 : bloomFilter.getBitsetSize();
return repetitionLevelColumn.getBufferedSize()
+ definitionLevelColumn.getBufferedSize()
+ dataColumn.getBufferedSize()
- + pageWriter.getMemSize();
+ + pageWriter.getMemSize()
+ + bloomBufferSize;
}
/**
* @return actual memory used
*/
long allocatedSize() {
+ long bloomAllocatedSize = bloomFilter == null ? 0 : bloomFilter.getBitsetSize();
return repetitionLevelColumn.getAllocatedSize()
+ definitionLevelColumn.getAllocatedSize()
+ dataColumn.getAllocatedSize()
- + pageWriter.allocatedSize();
+ + pageWriter.allocatedSize()
+ + bloomAllocatedSize;
}
/**
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
index 646e31a..98c1e19 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
@@ -27,16 +27,21 @@
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
/**
* Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
*/
final class ColumnWriterV1 extends ColumnWriterBase {
-
ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
super(path, pageWriter, props);
}
+ public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter,
+ BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+ super(path, pageWriter, bloomFilterWriter, props);
+ }
+
@Override
ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
return props.newRepetitionLevelWriter(path);
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
index e4e8563..cc44e2d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,6 +28,7 @@
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.ValuesWriter;
import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
import org.apache.parquet.io.ParquetEncodingException;
@@ -59,6 +60,11 @@
super(path, pageWriter, props);
}
+ ColumnWriterV2(ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter bloomFilterWriter,
+ ParquetProperties props) {
+ super(path, pageWriter, bloomFilterWriter, props);
+ }
+
@Override
ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
return path.getMaxRepetitionLevel() == 0 ? NULL_WRITER : new RLEWriterForV2(props.newRepetitionLevelEncoder(path));
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
index 0aac63e..aa68cc1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
new file mode 100644
index 0000000..cc9f674
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.io.api.Binary;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+
+/*
+ * This Bloom filter is implemented using block-based Bloom filter algorithm from Putze et al.'s
+ * "Cache-, Hash- and Space-Efficient Bloom filters". The basic idea is to hash the item to a tiny
+ * Bloom filter which size fit a single cache line or smaller. This implementation sets 8 bits in
+ * each tiny Bloom filter. Each tiny Bloom filter is 32 bytes to take advantage of 32-byte SIMD
+ * instruction.
+ */
+public class BlockSplitBloomFilter implements BloomFilter {
+ // Bytes in a tiny Bloom filter block.
+ private static final int BYTES_PER_BLOCK = 32;
+
+ // Bits in a tiny Bloom filter block.
+ private static final int BITS_PER_BLOCK = 256;
+
+ // Default minimum Bloom filter size, set to the size of a tiny Bloom filter block
+ public static final int DEFAULT_MINIMUM_BYTES = 32;
+
+ // Default Maximum Bloom filter size, set to 1MB which should cover most cases.
+ public static final int DEFAULT_MAXIMUM_BYTES = 1024 * 1024;
+
+ // The number of bits to set in a tiny Bloom filter
+ private static final int BITS_SET_PER_BLOCK = 8;
+
+ // The metadata in the header of a serialized Bloom filter is four four-byte values: the number of bytes,
+ // the filter algorithm, the hash algorithm, and the compression.
+ public static final int HEADER_SIZE = 16;
+
+ // The default false positive probability value
+ public static final double DEFAULT_FPP = 0.01;
+
+ // Hash strategy used in this Bloom filter.
+ private final HashStrategy hashStrategy;
+
+ // The underlying byte array for Bloom filter bitset.
+ private byte[] bitset;
+
+ // A integer array buffer of underlying bitset to help setting bits.
+ private IntBuffer intBuffer;
+
+ // Hash function use to compute hash for column value.
+ private HashFunction hashFunction;
+
+ private int maximumBytes = DEFAULT_MAXIMUM_BYTES;
+ private int minimumBytes = DEFAULT_MINIMUM_BYTES;
+
+ // The block-based algorithm needs 8 odd SALT values to calculate eight indexes
+ // of bits to set, one per 32-bit word.
+ private static final int[] SALT = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
+ 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
+
+ /**
+ * Constructor of block-based Bloom filter.
+ *
+ * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+ * [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down
+ * to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
+ * of 2. It uses XXH64 as its default hash function.
+ */
+ public BlockSplitBloomFilter(int numBytes) {
+ this(numBytes, DEFAULT_MAXIMUM_BYTES, HashStrategy.XXH64);
+ }
+
+ /**
+ * Constructor of block-based Bloom filter.
+ *
+ * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+ * [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down
+ * to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
+ * of 2. It uses XXH64 as its default hash function.
+ * @param maximumBytes The maximum bytes of the Bloom filter.
+ */
+ public BlockSplitBloomFilter(int numBytes, int maximumBytes) {
+ this(numBytes, maximumBytes, HashStrategy.XXH64);
+ }
+
+ /**
+ * Constructor of block-based Bloom filter.
+ *
+ * @param numBytes The number of bytes for Bloom filter bitset
+ * @param hashStrategy The hash strategy of Bloom filter.
+ */
+ private BlockSplitBloomFilter(int numBytes, HashStrategy hashStrategy) {
+ this(numBytes, DEFAULT_MAXIMUM_BYTES, hashStrategy);
+ }
+
+ /**
+ * Constructor of block-based Bloom filter.
+ *
+ * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+ * [DEFAULT_MINIMUM_BYTES, maximumBytes], it will be rounded up/down to lower/upper bound if
+ * num_bytes is out of range. It will also be rounded up to a power of 2.
+ * @param maximumBytes The maximum bytes of the Bloom filter.
+ * @param hashStrategy The adopted hash strategy of the Bloom filter.
+ */
+ public BlockSplitBloomFilter(int numBytes, int maximumBytes, HashStrategy hashStrategy) {
+ if (maximumBytes > DEFAULT_MINIMUM_BYTES) {
+ this.maximumBytes = maximumBytes;
+ }
+ initBitset(numBytes);
+
+ switch (hashStrategy) {
+ case XXH64:
+ this.hashStrategy = hashStrategy;
+ hashFunction = new XxHash();
+ break;
+ default:
+ throw new RuntimeException("Unsupported hash strategy");
+ }
+ }
+
+
+ /**
+ * Construct the Bloom filter with given bitset, it is used when reconstructing
+ * Bloom filter from parquet file. It use XXH64 as its default hash
+ * function.
+ *
+ * @param bitset The given bitset to construct Bloom filter.
+ */
+ public BlockSplitBloomFilter(byte[] bitset) {
+ this(bitset, HashStrategy.XXH64);
+ }
+
+ /**
+ * Construct the Bloom filter with given bitset, it is used when reconstructing
+ * Bloom filter from parquet file.
+ *
+ * @param bitset The given bitset to construct Bloom filter.
+ * @param hashStrategy The hash strategy Bloom filter apply.
+ */
+ private BlockSplitBloomFilter(byte[] bitset, HashStrategy hashStrategy) {
+ if (bitset == null) {
+ throw new RuntimeException("Given bitset is null");
+ }
+
+ this.bitset = bitset;
+ this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+ switch (hashStrategy) {
+ case XXH64:
+ this.hashStrategy = hashStrategy;
+ hashFunction = new XxHash();
+ break;
+ default:
+ throw new RuntimeException("Unsupported hash strategy");
+ }
+ }
+
+ /**
+ * Create a new bitset for Bloom filter.
+ *
+ * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+ * [minimumBytes, maximumBytes], it will be rounded up/down
+ * to lower/upper bound if num_bytes is out of range and also will rounded up to a power
+ * of 2. It uses XXH64 as its default hash function and block-based algorithm
+ * as default algorithm.
+ */
+ private void initBitset(int numBytes) {
+ if (numBytes < minimumBytes) {
+ numBytes = minimumBytes;
+ }
+ // Get next power of 2 if it is not power of 2.
+ if ((numBytes & (numBytes - 1)) != 0) {
+ numBytes = Integer.highestOneBit(numBytes) << 1;
+ }
+ if (numBytes > maximumBytes || numBytes < 0) {
+ numBytes = maximumBytes;
+ }
+ this.bitset = new byte[numBytes];
+ this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ // Write number of bytes of bitset.
+ out.write(BytesUtils.intToBytes(bitset.length));
+ // Write hash strategy
+ out.write(BytesUtils.intToBytes(hashStrategy.value));
+ // Write algorithm
+ out.write(BytesUtils.intToBytes(Algorithm.BLOCK.value));
+ // Write compression
+ out.write(BytesUtils.intToBytes(Compression.UNCOMPRESSED.value));
+ // Write bitset
+ out.write(bitset);
+ }
+
+ private int[] setMask(int key) {
+ int[] mask = new int[BITS_SET_PER_BLOCK];
+
+ for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+ mask[i] = key * SALT[i];
+ }
+ for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+ mask[i] = mask[i] >>> 27;
+ }
+ for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+ mask[i] = 0x1 << mask[i];
+ }
+
+ return mask;
+ }
+
+ @Override
+ public void insertHash(long hash) {
+ long numBlocks = bitset.length / BYTES_PER_BLOCK;
+ long lowHash = hash >>> 32;
+ int blockIndex = (int)((lowHash * numBlocks) >> 32);
+ int key = (int)hash;
+
+ // Calculate mask for bucket.
+ int[] mask = setMask(key);
+ for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
+ int value = intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i);
+ value |= mask[i];
+ intBuffer.put(blockIndex * (BYTES_PER_BLOCK / 4) + i, value);
+ }
+ }
+
+ @Override
+ public boolean findHash(long hash) {
+ long numBlocks = bitset.length / BYTES_PER_BLOCK;
+ long lowHash = hash >>> 32;
+ int blockIndex = (int)((lowHash * numBlocks) >> 32);
+ int key = (int)hash;
+
+ // Calculate mask for the tiny Bloom filter.
+ int[] mask = setMask(key);
+ for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
+ if (0 == (intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Calculate optimal size according to the number of distinct values and false positive probability.
+ *
+ * @param n: The number of distinct values.
+ * @param p: The false positive probability.
+ * @return optimal number of bits of given n and p.
+ */
+ public static int optimalNumOfBits(long n, double p) {
+ Preconditions.checkArgument((p > 0.0 && p < 1.0),
+ "FPP should be less than 1.0 and great than 0.0");
+ final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8));
+ final double MAX = DEFAULT_MAXIMUM_BYTES << 3;
+ int numBits = (int)m;
+
+ // Handle overflow.
+ if (m > MAX || m < 0) {
+ numBits = (int)MAX;
+ }
+
+ // Round to BITS_PER_BLOCK
+ numBits = (numBits + BITS_PER_BLOCK -1) & ~BITS_PER_BLOCK;
+
+ if (numBits < (DEFAULT_MINIMUM_BYTES << 3)) {
+ numBits = DEFAULT_MINIMUM_BYTES << 3;
+ }
+
+ return numBits;
+ }
+
+ @Override
+ public long getBitsetSize() {
+ return this.bitset.length;
+ }
+
+ @Override
+ public long hash(Object value) {
+ ByteBuffer plain;
+
+ if (value instanceof Binary) {
+ return hashFunction.hashBytes(((Binary) value).getBytes());
+ }
+
+ if (value instanceof Integer) {
+ plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value));
+ } else if (value instanceof Long) {
+ plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value));
+ } else if (value instanceof Float) {
+ plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value));
+ } else if (value instanceof Double) {
+ plain = ByteBuffer.allocate(Double.SIZE/ Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value));
+ } else {
+ throw new RuntimeException("Parquet Bloom filter: Not supported type");
+ }
+
+ return hashFunction.hashByteBuffer(plain);
+ }
+
+ @Override
+ public long hash(int value) {
+ ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value);
+ return hashFunction.hashByteBuffer(plain);
+ }
+
+ @Override
+ public long hash(long value) {
+ ByteBuffer plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putLong(value);
+ return hashFunction.hashByteBuffer(plain);
+ }
+
+ @Override
+ public long hash(double value) {
+ ByteBuffer plain = ByteBuffer.allocate(Double.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(value);
+ return hashFunction.hashByteBuffer(plain);
+ }
+
+ @Override
+ public long hash(float value) {
+ ByteBuffer plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
+ plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(value);
+ return hashFunction.hashByteBuffer(plain);
+ }
+
+ @Override
+ public long hash(Binary value) {
+ return hashFunction.hashBytes(value.getBytes());
+ }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
new file mode 100644
index 0000000..8b26c97
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.io.api.Binary;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A Bloom filter is a compact structure to indicate whether an item is not in a set or probably
+ * in a set. The Bloom filter usually consists of a bit set that represents a elements set,
+ * a hash strategy and a Bloom filter algorithm.
+ */
+public interface BloomFilter {
+ /* Bloom filter Hash strategy.
+ *
+ * xxHash is an extremely fast hash algorithm, running at RAM speed limits. It successfully
+ * completes the SMHasher test suite which evaluates collision, dispersion and randomness qualities
+ * of hash functions. It shows good performance advantage from its benchmark result.
+ * (see https://github.com/Cyan4973/xxHash).
+ */
+ enum HashStrategy {
+ XXH64(0);
+ HashStrategy(int value) {
+ this.value = value;
+ }
+ int value;
+ }
+
+ // Bloom filter algorithm.
+ enum Algorithm {
+ BLOCK(0);
+ Algorithm(int value) {
+ this.value = value;
+ }
+ int value;
+ }
+
+ // Bloom filter compression.
+ enum Compression {
+ UNCOMPRESSED(0);
+ Compression(int value) {
+ this.value = value;
+ }
+ int value;
+ }
+
+ /**
+ * Write the Bloom filter to an output stream. It writes the Bloom filter header including the
+ * bitset's length in bytes, the hash strategy, the algorithm, and the bitset.
+ *
+ * @param out the output stream to write
+ */
+ void writeTo(OutputStream out) throws IOException;
+
+ /**
+ * Insert an element to the Bloom filter, the element content is represented by
+ * the hash value of its plain encoding result.
+ *
+ * @param hash the hash result of element.
+ */
+ void insertHash(long hash);
+
+ /**
+ * Determine whether an element is in set or not.
+ *
+ * @param hash the hash value of element plain encoding result.
+ * @return false if element is must not in set, true if element probably in set.
+ */
+ boolean findHash(long hash);
+
+ /**
+ * Get the number of bytes for bitset in this Bloom filter.
+ *
+ * @return The number of bytes for bitset in this Bloom filter.
+ */
+ long getBitsetSize();
+
+ /**
+ * Compute hash for int value by using its plain encoding result.
+ *
+ * @param value the value to hash
+ * @return hash result
+ */
+ long hash(int value);
+
+ /**
+ * Compute hash for long value by using its plain encoding result.
+ *
+ * @param value the value to hash
+ * @return hash result
+ */
+ long hash(long value) ;
+
+ /**
+ * Compute hash for double value by using its plain encoding result.
+ *
+ * @param value the value to hash
+ * @return hash result
+ */
+ long hash(double value);
+
+ /**
+ * Compute hash for float value by using its plain encoding result.
+ *
+ * @param value the value to hash
+ * @return hash result
+ */
+ long hash(float value);
+
+ /**
+ * Compute hash for Binary value by using its plain encoding result.
+ *
+ * @param value the value to hash
+ * @return hash result
+ */
+ long hash(Binary value);
+
+ /**
+ * Compute hash for Object value by using its plain encoding result.
+ *
+ * @param value the value to hash
+ * @return hash result
+ */
+ long hash(Object value);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java
new file mode 100644
index 0000000..f7e28fd
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+/**
+ * Contains all writers for all columns of a row group
+ */
+public interface BloomFilterWriteStore {
+ /**
+ * Get bloom filter writer of a column
+ *
+ * @param path the descriptor for the column
+ * @return the corresponding Bloom filter writer
+ */
+ BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
new file mode 100644
index 0000000..e2504d8
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+public interface BloomFilterWriter {
+ /**
+ * Write a Bloom filter
+ *
+ * @param bloomFilter the Bloom filter to write
+ *
+ */
+ void writeBloomFilter(BloomFilter bloomFilter);
+}
+
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java
new file mode 100644
index 0000000..2043934
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A interface contains a set of hash functions used by Bloom filter.
+ */
+public interface HashFunction {
+
+ /**
+ * compute the hash value for a byte array.
+ * @param input the input byte array
+ * @return a result of long value.
+ */
+ long hashBytes(byte[] input);
+
+ /**
+ * compute the hash value for a ByteBuffer.
+ * @param input the input ByteBuffer
+ * @return a result of long value.
+ */
+ long hashByteBuffer(ByteBuffer input);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java
new file mode 100644
index 0000000..6c52b3c
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import net.openhft.hashing.LongHashFunction;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The implementation of HashFunction interface. The XxHash uses XXH64 version xxHash
+ * with a seed of 0.
+ */
+public class XxHash implements HashFunction {
+ @Override
+ public long hashBytes(byte[] input) {
+ return LongHashFunction.xx(0).hashBytes(input);
+ }
+
+ @Override
+ public long hashByteBuffer(ByteBuffer input) {
+ return LongHashFunction.xx(0).hashBytes(input);
+ }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
new file mode 100644
index 0000000..d75c0e2
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.parquet.column.values.RandomStr;
+import org.apache.parquet.io.api.Binary;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockSplitBloomFilter {
+ @Test
+ public void testConstructor () {
+ BloomFilter bloomFilter1 = new BlockSplitBloomFilter(0);
+ assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.DEFAULT_MINIMUM_BYTES);
+ BloomFilter bloomFilter2 = new BlockSplitBloomFilter(BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES + 1);
+ assertEquals(bloomFilter2.getBitsetSize(), BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES);
+ BloomFilter bloomFilter3 = new BlockSplitBloomFilter(1000);
+ assertEquals(bloomFilter3.getBitsetSize(), 1024);
+ }
+
+ @Rule
+ public final TemporaryFolder temp = new TemporaryFolder();
+
+ /*
+ * This test is used to test basic operations including inserting, finding and
+ * serializing and de-serializing.
+ */
+ @Test
+ public void testBasic () throws IOException {
+ final String[] testStrings = {"hello", "parquet", "bloom", "filter"};
+ BloomFilter bloomFilter = new BlockSplitBloomFilter(1024);
+
+ for(int i = 0; i < testStrings.length; i++) {
+ bloomFilter.insertHash(bloomFilter.hash(Binary.fromString(testStrings[i])));
+ }
+
+ File testFile = temp.newFile();
+ FileOutputStream fileOutputStream = new FileOutputStream(testFile);
+ bloomFilter.writeTo(fileOutputStream);
+ fileOutputStream.close();
+ FileInputStream fileInputStream = new FileInputStream(testFile);
+
+ byte[] value = new byte[4];
+ fileInputStream.read(value);
+ int length = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ assertEquals(length, 1024);
+
+ fileInputStream.read(value);
+ int hash = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ assertEquals(hash, BloomFilter.HashStrategy.XXH64.ordinal());
+
+ fileInputStream.read(value);
+ int algorithm = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ assertEquals(algorithm, BloomFilter.Algorithm.BLOCK.ordinal());
+
+ fileInputStream.read(value);
+ int compression = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ assertEquals(compression, BloomFilter.Compression.UNCOMPRESSED.ordinal());
+
+ byte[] bitset = new byte[length];
+ fileInputStream.read(bitset);
+ bloomFilter = new BlockSplitBloomFilter(bitset);
+ for (String testString : testStrings) {
+ assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(testString))));
+ }
+ }
+
+ @Test
+ public void testFPP() throws IOException {
+ final int totalCount = 100000;
+ final double FPP = 0.01;
+ final long SEED = 104729;
+
+ BloomFilter bloomFilter = new BlockSplitBloomFilter(BlockSplitBloomFilter.optimalNumOfBits(totalCount, FPP));
+ List<String> strings = new ArrayList<>();
+ RandomStr randomStr = new RandomStr(new Random(SEED));
+ for(int i = 0; i < totalCount; i++) {
+ String str = randomStr.get(10);
+ strings.add(str);
+ bloomFilter.insertHash(bloomFilter.hash(Binary.fromString(str)));
+ }
+
+ // The exist counts the number of times FindHash returns true.
+ int exist = 0;
+ for (int i = 0; i < totalCount; i++) {
+ String str = randomStr.get(8);
+ if (bloomFilter.findHash(bloomFilter.hash(Binary.fromString(str)))) {
+ exist ++;
+ }
+ }
+
+ // The exist should be probably less than 1000 according FPP 0.01.
+ assertTrue(exist < totalCount * FPP);
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java
new file mode 100644
index 0000000..c1e3774
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.filter2.BloomFilterLevel;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.BloomFilterReader;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+public class BloomFilterImpl implements FilterPredicate.Visitor<Boolean>{
+ private static final Logger LOG = LoggerFactory.getLogger(BloomFilterImpl.class);
+ private static final boolean BLOCK_MIGHT_MATCH = false;
+ private static final boolean BLOCK_CANNOT_MATCH = true;
+
+ private final Map<ColumnPath, ColumnChunkMetaData> columns = new HashMap<ColumnPath, ColumnChunkMetaData>();
+
+ public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns, BloomFilterReader bloomFilterReader) {
+ checkNotNull(pred, "pred");
+ checkNotNull(columns, "columns");
+ return pred.accept(new BloomFilterImpl(columns, bloomFilterReader));
+ }
+
+ private BloomFilterImpl(List<ColumnChunkMetaData> columnsList, BloomFilterReader bloomFilterReader) {
+ for (ColumnChunkMetaData chunk : columnsList) {
+ columns.put(chunk.getPath(), chunk);
+ }
+
+ this.bloomFilterReader = bloomFilterReader;
+ }
+
+ private BloomFilterReader bloomFilterReader;
+
+ private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
+ return columns.get(columnPath);
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.Eq<T> eq) {
+ T value = eq.getValue();
+
+ if (value == null) {
+ // the bloom filter bitset contains only non-null values so isn't helpful. this
+ // could check the column stats, but the StatisticsFilter is responsible
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ Operators.Column<T> filterColumn = eq.getColumn();
+ ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+ if (meta == null) {
+ // the column isn't in this file so all values are null, but the value
+ // must be non-null because of the above check.
+ return BLOCK_CANNOT_MATCH;
+ }
+
+ try {
+ BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(meta);
+ if (bloomFilter != null && !bloomFilter.findHash(bloomFilter.hash(value))) {
+ return BLOCK_CANNOT_MATCH;
+ }
+ } catch (RuntimeException e) {
+ LOG.warn(e.getMessage());
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.NotEq<T> notEq) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.Lt<T> lt) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.LtEq<T> ltEq) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.Gt<T> gt) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>> Boolean visit(Operators.GtEq<T> gtEq) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public Boolean visit(Operators.And and) {
+ return and.getLeft().accept(this) || and.getRight().accept(this);
+ }
+
+ @Override
+ public Boolean visit(Operators.Or or) {
+ return or.getLeft().accept(this) && or.getRight().accept(this);
+ }
+
+ @Override
+ public Boolean visit(Operators.Not not) {
+ throw new IllegalArgumentException(
+ "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+ }
+
+ private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.UserDefined<T, U> ud, boolean inverted) {
+ return BLOCK_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.UserDefined<T, U> udp) {
+ return visit(udp, false);
+ }
+
+ @Override
+ public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.LogicalNotUserDefined<T, U> udp) {
+ return visit(udp.getUserDefined(), true);
+ }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
index d1d40e9..fe6f637 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
+import org.apache.parquet.filter2.BloomFilterLevel.BloomFilterImpl;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
import org.apache.parquet.filter2.compat.FilterCompat.Visitor;
@@ -32,6 +33,8 @@
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.apache.parquet.Preconditions.checkNotNull;
@@ -45,10 +48,12 @@
private final MessageType schema;
private final List<FilterLevel> levels;
private final ParquetFileReader reader;
+ private Logger logger = LoggerFactory.getLogger(RowGroupFilter.class);
public enum FilterLevel {
STATISTICS,
- DICTIONARY
+ DICTIONARY,
+ BLOOMFILTER
}
/**
@@ -104,6 +109,11 @@
drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), reader.getDictionaryReader(block));
}
+ if (!drop && levels.contains(FilterLevel.BLOOMFILTER)) {
+ drop = BloomFilterImpl.canDrop(filterPredicate, block.getColumns(), reader.getBloomFilterDataReader(block));
+ if (drop) logger.info("Block drop by Bloom filter");
+ }
+
if(!drop) {
filteredBlocks.add(block);
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index bfb4aa3..625bbc3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -473,6 +473,7 @@
columnMetaData.getTotalSize(),
columnMetaData.getFirstDataPageOffset());
columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
+ columnChunk.meta_data.setBloom_filter_offset(columnMetaData.getBloomFilterOffset());
if (!columnMetaData.getStatistics().isEmpty()) {
columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength));
}
@@ -1240,6 +1241,7 @@
metaData.total_uncompressed_size);
column.setColumnIndexReference(toColumnIndexReference(columnChunk));
column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
+ column.setBloomFilterOffset(metaData.bloom_filter_offset);
// TODO
// index_page_offset
// key_value_metadata
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java
new file mode 100644
index 0000000..3ad91ce
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Bloom filter reader that reads Bloom filter data from an open {@link ParquetFileReader}.
+ *
+ */
+public class BloomFilterReader {
+ private final ParquetFileReader reader;
+ private final Map<ColumnPath, ColumnChunkMetaData> columns;
+ private final Map<ColumnPath, BloomFilter> cache = new HashMap<>();
+
+ public BloomFilterReader(ParquetFileReader fileReader, BlockMetaData block) {
+ this.reader = fileReader;
+ this.columns = new HashMap<>();
+ for (ColumnChunkMetaData column : block.getColumns()) {
+ columns.put(column.getPath(), column);
+ }
+ }
+
+ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) {
+ if (cache.containsKey(meta.getPath())) {
+ return cache.get(meta.getPath());
+ }
+ try {
+ synchronized (cache) {
+ if (!cache.containsKey(meta.getPath())) {
+ BloomFilter bloomFilter = reader.readBloomFilter(meta);
+ if (bloomFilter == null) return null;
+ cache.put(meta.getPath(), bloomFilter);
+ }
+ }
+ return cache.get(meta.getPath());
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "Failed to read Bloom filter data", e);
+ }
+ }
+
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 72f26fc..d2e4c96 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -37,6 +37,9 @@
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
@@ -47,12 +50,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class ColumnChunkPageWriteStore implements PageWriteStore {
+class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore {
private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageWriteStore.class);
private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
- private static final class ColumnChunkPageWriter implements PageWriter {
+ private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter {
private final ColumnDescriptor path;
private final BytesCompressor compressor;
@@ -71,6 +74,7 @@
private Set<Encoding> dlEncodings = new HashSet<Encoding>();
private List<Encoding> dataEncodings = new ArrayList<Encoding>();
+ private BloomFilter bloomFilter;
private ColumnIndexBuilder columnIndexBuilder;
private OffsetIndexBuilder offsetIndexBuilder;
private Statistics totalStatistics;
@@ -249,6 +253,7 @@
totalStatistics,
columnIndexBuilder,
offsetIndexBuilder,
+ bloomFilter,
rlEncodings,
dlEncodings,
dataEncodings);
@@ -289,6 +294,10 @@
return buf.memUsageString(prefix + " ColumnChunkPageWriter");
}
+ @Override
+ public void writeBloomFilter(BloomFilter bloomFilter) {
+ this.bloomFilter = bloomFilter;
+ }
}
private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
@@ -313,6 +322,11 @@
return writers.get(path);
}
+ @Override
+ public BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path) {
+ return writers.get(path);
+ }
+
public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
for (ColumnDescriptor path : schema.getColumns()) {
ColumnChunkPageWriter pageWriter = writers.get(path);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index c3da323..18ee788 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -28,6 +28,7 @@
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
@@ -64,6 +65,7 @@
private ColumnWriteStore columnStore;
private ColumnChunkPageWriteStore pageStore;
+ private BloomFilterWriteStore bloomFilterWriteStore;
private RecordConsumer recordConsumer;
/**
@@ -101,9 +103,12 @@
}
private void initStore() {
- pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(),
- props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled());
- columnStore = props.newColumnWriteStore(schema, pageStore);
+ ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(compressor,
+ schema, props.getAllocator(), props.getColumnIndexTruncateLength());
+ pageStore = columnChunkPageWriteStore;
+ bloomFilterWriteStore = columnChunkPageWriteStore;
+
+ columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
this.recordConsumer = columnIO.getRecordWriter(columnStore);
writeSupport.prepareForWrite(recordConsumer);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 366c429..52cacb0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -34,6 +34,8 @@
import java.io.IOException;
import java.io.SequenceInputStream;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -69,6 +71,8 @@
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -1050,6 +1054,51 @@
converter.getEncoding(dictHeader.getEncoding()));
}
+ public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) {
+ return new BloomFilterReader(this, block);
+ }
+
+ /**
+ * Reads Bloom filter data for the given column chunk.
+ *
+ * @param meta a column's ColumnChunkMetaData to read the dictionary from
+ * @return an BloomFilter object.
+ * @throws IOException if there is an error while reading the Bloom filter.
+ */
+ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException {
+ long bloomFilterOffset = meta.getBloomFilterOffset();
+ f.seek(bloomFilterOffset);
+
+ // Read Bloom filter data header.
+ byte[] bytes = new byte[BlockSplitBloomFilter.HEADER_SIZE];
+ f.read(bytes);
+ ByteBuffer bloomHeader = ByteBuffer.wrap(bytes);
+ IntBuffer headerBuffer = bloomHeader.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+ int numBytes = headerBuffer.get();
+ if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES) {
+ return null;
+ }
+
+ BloomFilter.HashStrategy hash = BloomFilter.HashStrategy.values()[headerBuffer.get()];
+ if (hash != BlockSplitBloomFilter.HashStrategy.XXH64) {
+ return null;
+ }
+
+ BloomFilter.Algorithm algorithm = BloomFilter.Algorithm.values()[headerBuffer.get()];
+ if (algorithm != BlockSplitBloomFilter.Algorithm.BLOCK) {
+ return null;
+ }
+
+ BloomFilter.Compression compression = BloomFilter.Compression.values()[headerBuffer.get()];
+ if (compression != BlockSplitBloomFilter.Compression.UNCOMPRESSED) {
+ return null;
+ }
+
+ byte[] bitset = new byte[numBytes];
+ f.readFully(bitset);
+ return new BlockSplitBloomFilter(bitset);
+ }
+
/**
* @param column
* the column chunk which the column index is to be returned for
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 39a75bc..63ef7bc 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,6 +50,8 @@
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.example.DummyRecordConverter;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.format.Util;
@@ -110,6 +112,9 @@
private final List<List<ColumnIndex>> columnIndexes = new ArrayList<>();
private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
+ // The Bloom filters
+ private final List<List<BloomFilter>> bloomFilters = new ArrayList<>();
+
// row group data
private BlockMetaData currentBlock; // appended to by endColumn
@@ -117,6 +122,9 @@
private List<ColumnIndex> currentColumnIndexes;
private List<OffsetIndex> currentOffsetIndexes;
+ // The Bloom filter for the actual block
+ private List<BloomFilter> currentBloomFilters;
+
// row group data set at the start of a row group
private long currentRecordCount; // set in startBlock
@@ -355,6 +363,8 @@
currentColumnIndexes = new ArrayList<>();
currentOffsetIndexes = new ArrayList<>();
+
+ currentBloomFilters = new ArrayList<>();
}
/**
@@ -575,6 +585,14 @@
}
/**
+ * Write a Bloom filter
+ * @param bloomFilter the bloom filter of column values
+ */
+ void writeBloomFilter(BloomFilter bloomFilter) {
+ currentBloomFilters.add(bloomFilter);
+ }
+
+ /**
* Writes a column chunk at once
* @param descriptor the descriptor of the column
* @param valueCount the value count in this column
@@ -586,6 +604,7 @@
* @param totalStats accumulated statistics for the column chunk
* @param columnIndexBuilder the builder object for the column index
* @param offsetIndexBuilder the builder object for the offset index
+ * @param bloomFilter the bloom filter for this column
* @param rlEncodings the RL encodings used in this column chunk
* @param dlEncodings the DL encodings used in this column chunk
* @param dataEncodings the data encodings used in this column chunk
@@ -601,14 +620,18 @@
Statistics<?> totalStats,
ColumnIndexBuilder columnIndexBuilder,
OffsetIndexBuilder offsetIndexBuilder,
+ BloomFilter bloomFilter,
Set<Encoding> rlEncodings,
Set<Encoding> dlEncodings,
List<Encoding> dataEncodings) throws IOException {
startColumn(descriptor, valueCount, compressionCodecName);
state = state.write();
+
if (dictionaryPage != null) {
writeDictionaryPage(dictionaryPage);
+ } else if (bloomFilter != null) {
+ currentBloomFilters.add(bloomFilter);
}
LOG.debug("{}: write data pages", out.getPos());
long headersSize = bytes.size() - compressedTotalPageSize;
@@ -675,8 +698,10 @@
blocks.add(currentBlock);
columnIndexes.add(currentColumnIndexes);
offsetIndexes.add(currentOffsetIndexes);
+ bloomFilters.add(currentBloomFilters);
currentColumnIndexes = null;
currentOffsetIndexes = null;
+ currentBloomFilters = null;
currentBlock = null;
}
@@ -858,6 +883,7 @@
state = state.end();
serializeColumnIndexes(columnIndexes, blocks, out);
serializeOffsetIndexes(offsetIndexes, blocks, out);
+ serializeBloomFilters(bloomFilters, blocks, out);
LOG.debug("{}: end", out.getPos());
this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer, out);
@@ -907,6 +933,28 @@
}
}
+ private static void serializeBloomFilters(
+ List<List<BloomFilter>> bloomFilters,
+ List<BlockMetaData> blocks,
+ PositionOutputStream out) throws IOException {
+ LOG.debug("{}: bloom filters", out.getPos());
+ for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+ List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+ List<BloomFilter> blockBloomFilters = bloomFilters.get(bIndex);
+ if (blockBloomFilters.isEmpty()) continue;
+ for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+ BloomFilter bloomFilter = blockBloomFilters.get(cIndex);
+ if (bloomFilter == null) {
+ continue;
+ }
+ ColumnChunkMetaData column = columns.get(cIndex);
+ long offset = out.getPos();
+ column.setBloomFilterOffset(offset);
+ bloomFilter.writeTo(out);
+ }
+ }
+ }
+
private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
long footerIndex = out.getPos();
ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index c3c52e3..62f21c8 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 676e2ca..fd79d7d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,11 @@
import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -145,6 +150,9 @@
public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
public static final String STATISTICS_TRUNCATE_LENGTH = "parquet.statistics.truncate.length";
+ public static final String BLOOM_FILTER_COLUMN_NAMES = "parquet.bloom.filter.column.names";
+ public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
+ public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes";
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
@@ -212,6 +220,43 @@
return getEnableDictionary(getConfiguration(jobContext));
}
+ public static int getBloomFilterMaxBytes(Configuration conf) {
+ return conf.getInt(BLOOM_FILTER_MAX_BYTES,
+ ParquetProperties.DEFAULT_MAX_BLOOM_FILTER_BYTES);
+ }
+
+ public static Set<String> getBloomFilterColumns(Configuration conf) {
+ String columnNames = conf.get(BLOOM_FILTER_COLUMN_NAMES);
+ if (columnNames != null) {
+ return new HashSet<>(Arrays.asList(columnNames.split(",")));
+ } else {
+ return new HashSet<>();
+ }
+ }
+
+ public static Map<String, Long> getBloomFilterColumnExpectedNDVs(Configuration conf) {
+ Map<String, Long> kv = new HashMap<>();
+ String columnNamesConf = conf.get(BLOOM_FILTER_COLUMN_NAMES);
+ String expectedNDVsConf = conf.get(BLOOM_FILTER_EXPECTED_NDV);
+
+ if (columnNamesConf == null || expectedNDVsConf == null) {
+ return kv;
+ }
+
+ String[] columnNames = columnNamesConf.split(",");
+ String[] expectedNDVs = expectedNDVsConf.split(",");
+
+ if (columnNames.length == expectedNDVs.length) {
+ for (int i = 0; i < columnNames.length; i++) {
+ kv.put(columnNames[i], Long.getLong(expectedNDVs[i]));
+ }
+ } else {
+ LOG.warn("Bloom filter column names are not match expected NDVs");
+ }
+
+ return kv;
+ }
+
public static int getBlockSize(JobContext jobContext) {
return getBlockSize(getConfiguration(jobContext));
}
@@ -435,6 +480,9 @@
.withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
.withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
.withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
+ .withBloomFilterColumnNames(getBloomFilterColumns(conf))
+ .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
+ .withBloomFilterColumnNdvs(getBloomFilterColumnExpectedNDVs(conf))
.withPageRowCountLimit(getPageRowCountLimit(conf))
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
.build();
@@ -456,6 +504,10 @@
LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
LOG.info("Truncate length for statistics min/max is: {}", props.getStatisticsTruncateLength());
+ LOG.info("Bloom filter enabled column names are: {}", props.getBloomFilterColumns());
+ LOG.info("Max Bloom filter size for a column is {}", props.getMaxBloomFilterBytes());
+ LOG.info("Bloom filter enabled column expected number of distinct values are: {}",
+ props.getBloomFilterColumnExpectedNDVs().values());
LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
LOG.info("Writing page checksums is: {}", props.getPageWriteChecksumEnabled() ? "on" : "off");
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 870a1a6..0ebd7d3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,8 @@
import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -539,6 +541,21 @@
}
/**
+ * Enables bloom filter column names for the constructed writer.
+ *
+ * @return this builder for method chaining.
+ */
+ public SELF withBloomFilterColumnNames(String... columnNames) {
+ if (columnNames != null) {
+ encodingPropsBuilder.withBloomFilterColumnNames(
+ new HashSet<>(Arrays.asList(columnNames))
+ );
+ }
+
+ return self();
+ }
+
+ /**
* Set a property that will be available to the read path. For writers that use a Hadoop
* configuration, this is the recommended way to add configuration values.
*
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index e6aa104..2c24356 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -173,6 +173,8 @@
private IndexReference columnIndexReference;
private IndexReference offsetIndexReference;
+ private long bloomFilterOffset;
+
protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
this(null, columnChunkProperties);
}
@@ -275,6 +277,23 @@
}
/**
+ * @param bloomFilterOffset
+ * the reference to the Bloom filter
+ */
+ @Private
+ public void setBloomFilterOffset(long bloomFilterOffset) {
+ this.bloomFilterOffset = bloomFilterOffset;
+ }
+
+ /**
+ * @return the offset to the Bloom filter
+ */
+ @Private
+ public long getBloomFilterOffset() {
+ return bloomFilterOffset;
+ }
+
+ /**
* @return all the encodings used in this column
*/
public Set<Encoding> getEncodings() {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index 88c8d83..0569c42 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -260,6 +260,7 @@
same(OffsetIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no offset index
any(),
any(),
+ any(),
any());
}
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 8763cac..3de4524 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,6 +27,8 @@
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.junit.Assume;
import org.junit.Rule;
@@ -220,6 +222,39 @@
}
@Test
+ public void testBloomFilterWriteRead() throws Exception {
+ MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
+ File testFile = temp.newFile();
+ testFile.delete();
+ Path path = new Path(testFile.toURI());
+ Configuration configuration = new Configuration();
+ configuration.set("parquet.bloom.filter.column.names", "foo");
+ String[] colPath = {"foo"};
+ ColumnDescriptor col = schema.getColumnDescription(colPath);
+ BinaryStatistics stats1 = new BinaryStatistics();
+ ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+ w.start();
+ w.startBlock(3);
+ w.startColumn(col, 5, CODEC);
+ w.writeDataPage(2, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+ w.writeDataPage(3, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+ w.endColumn();
+ BloomFilter blockSplitBloomFilter = new BlockSplitBloomFilter(0);
+ blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("hello")));
+ blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("world")));
+ w.writeBloomFilter(blockSplitBloomFilter);
+ w.endBlock();
+ w.end(new HashMap<>());
+ ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+ ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+ Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)));
+ BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
+ BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0));
+ assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello"))));
+ assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world"))));
+ }
+
+ @Test
public void testAlignmentWithPadding() throws Exception {
File testFile = temp.newFile();
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index c837d9a..9b01bb5 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,8 @@
package org.apache.parquet.hadoop;
import static java.util.Arrays.asList;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -45,8 +47,11 @@
import java.util.Map;
import java.util.concurrent.Callable;
+import net.openhft.hashing.LongHashFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.example.data.GroupFactory;
import org.apache.parquet.hadoop.example.ExampleInputFormat;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.schema.GroupType;
@@ -204,4 +209,43 @@
assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount);
}
}
+
+ @Test
+ public void testParquetFileWithBloomFilter() throws IOException {
+ MessageType schema = Types.buildMessage().
+ required(BINARY).as(stringType()).named("name").named("msg");
+
+ String[] testNames = {"hello", "parquet", "bloom", "filter"};
+
+ final int recordCount = testNames.length;
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ GroupFactory factory = new SimpleGroupFactory(schema);
+ File file = temp.newFile();
+ file.delete();
+ Path path = new Path(file.getAbsolutePath());
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+ .withPageRowCountLimit(10)
+ .withConf(conf)
+ .withDictionaryEncoding(false)
+ .withBloomFilterColumnNames("name")
+ .build()) {
+ for (String testName : testNames) {
+ writer.write(factory.newGroup().append("name", testName));
+ }
+ }
+
+ ParquetMetadata footer = readFooter(conf, path, NO_FILTER);
+ ParquetFileReader reader = new ParquetFileReader(
+ conf, footer.getFileMetaData(), path, footer.getBlocks(), schema.getColumns());
+
+ BloomFilter bloomFilter = reader.getBloomFilterDataReader(footer.getBlocks().get(0))
+ .readBloomFilter(footer.getBlocks().get(0).getColumns().get(0));
+
+ for (String name: testNames) {
+ assertTrue(bloomFilter.findHash(
+ LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index 719655f..77e942a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -289,6 +289,10 @@
<pattern>it.unimi.dsi</pattern>
<shadedPattern>${shade.prefix}.it.unimi.dsi</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>${shade.prefix}.com.google.common</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>