PARQUET-1332: Add bloom filter for parquet
This is first part of bloom filter patch set, which include a bloom filter utility and also some unit tests.
Note that this patch also includes murmur3Hash original code from Austin Appleby. The code isn't formatted as parquet-cpp format.
Author: Chen, Junjie <cjjnjust@gmail.com>
Closes #432 from cjjnjust/master and squashes the following commits:
d4d3018 [Chen, Junjie] PARQUET-1332: update constructor
b0f3f80 [Chen, Junjie] PARQUET-1332: update code for some coding style
fe97b44 [Chen, Junjie] PARQUET-1332: refine the complex classes
fd3ba23 [Chen, Junjie] PARQUET-1332: fix build error for clang Xcode
019322a [Chen, Junjie] PARQUET-1332: refine bloom filter algorithm
ec6a6e9 [Chen, Junjie] PARQUET-1332: fix build failure for windows platform
0c1e95f [Chen, Junjie] PARQUET-1332: update according to latest comments
1a105fc [Chen, Junjie] PARQUET-1332: update code according to Jim's comments
731191c [Chen, Junjie] PARQUET-1332: rebase to latest to solve CI error and update comments
0bc8595 [Chen, Junjie] PARQUET-1332: use abstract class for hash class
1374665 [Chen, Junjie] PARQUET-1332: update according to code review
858b2ce [Chen, Junjie] PARQUET-1332: remove unnecessary const_cast
23d7ccf [Chen, Junjie] PARQUET-1332: add missing file
f2ff8ad [Chen, Junjie] PARQUET-1332: Add bloom filter utility class
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 927f728..5b3c460 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -674,6 +674,7 @@
src/parquet/arrow/record_reader.cc
src/parquet/arrow/schema.cc
src/parquet/arrow/writer.cc
+ src/parquet/bloom_filter.cc
src/parquet/column_reader.cc
src/parquet/column_scanner.cc
src/parquet/column_writer.cc
@@ -681,6 +682,7 @@
src/parquet/file_reader.cc
src/parquet/file_writer.cc
src/parquet/metadata.cc
+ src/parquet/murmur3.cc
src/parquet/parquet_constants.cpp
src/parquet/parquet_types.cpp
src/parquet/printer.cc
diff --git a/data/bloom_filter.bin b/data/bloom_filter.bin
new file mode 100644
index 0000000..c0e30ce
--- /dev/null
+++ b/data/bloom_filter.bin
Binary files differ
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index bc16d8b..93a242c 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -17,6 +17,7 @@
# Headers: top level
install(FILES
+ bloom_filter.h
column_reader.h
column_page.h
column_scanner.h
@@ -25,7 +26,9 @@
exception.h
file_reader.h
file_writer.h
+ hasher.h
metadata.h
+ murmur3.h
printer.h
properties.h
schema.h
@@ -50,6 +53,7 @@
"${CMAKE_CURRENT_BINARY_DIR}/parquet.pc"
DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
+ADD_PARQUET_TEST(bloom_filter-test)
ADD_PARQUET_TEST(column_reader-test)
ADD_PARQUET_TEST(column_scanner-test)
ADD_PARQUET_TEST(column_writer-test)
diff --git a/src/parquet/README b/src/parquet/README
new file mode 100644
index 0000000..fc16a46
--- /dev/null
+++ b/src/parquet/README
@@ -0,0 +1,10 @@
+The CompatibilityTest of bloom_filter-test.cc is used to test cross compatibility of
+Bloom filters between parquet-mr and parquet-cpp. It reads the Bloom filter binary
+generated by the Bloom filter class in the parquet-mr project and tests whether the
+values inserted before could be filtered or not.
+
+The Bloom filter binary is generated by three steps from Parquet-mr:
+Step 1: Construct a Bloom filter with 1024 bytes of bitset.
+Step 2: Insert hashes of "hello", "parquet", "bloom", "filter" strings to Bloom filter
+by calling hash and insert APIs.
+Step 3: Call writeTo API to write to File.
diff --git a/src/parquet/bloom_filter-test.cc b/src/parquet/bloom_filter-test.cc
new file mode 100644
index 0000000..dbef8c8
--- /dev/null
+++ b/src/parquet/bloom_filter-test.cc
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <random>
+#include <string>
+
+#include "arrow/io/file.h"
+#include "parquet/bloom_filter.h"
+#include "parquet/murmur3.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/test-common.h"
+
+namespace parquet {
+namespace test {
+TEST(Murmur3Test, TestBloomFilter) {
+ uint64_t result;
+ const uint8_t bitset[8] = {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7};
+ ByteArray byteArray(8, bitset);
+ MurmurHash3 murmur3;
+ result = murmur3.Hash(&byteArray);
+ EXPECT_EQ(result, UINT64_C(913737700387071329));
+}
+
+TEST(ConstructorTest, TestBloomFilter) {
+ BlockSplitBloomFilter bloom_filter;
+ EXPECT_NO_THROW(bloom_filter.Init(1000));
+
+ // It throws because the length cannot be zero
+ std::unique_ptr<uint8_t[]> bitset1(new uint8_t[1024]());
+ EXPECT_THROW(bloom_filter.Init(bitset1.get(), 0), ParquetException);
+
+ // It throws because the number of bytes of Bloom filter bitset must be a power of 2.
+ std::unique_ptr<uint8_t[]> bitset2(new uint8_t[1024]());
+ EXPECT_THROW(bloom_filter.Init(bitset2.get(), 1023), ParquetException);
+}
+
+// The BasicTest is used to test basic operations including InsertHash, FindHash and
+// serializing and de-serializing.
+TEST(BasicTest, TestBloomFilter) {
+ BlockSplitBloomFilter bloom_filter;
+ bloom_filter.Init(1024);
+
+ for (int i = 0; i < 10; i++) {
+ bloom_filter.InsertHash(bloom_filter.Hash(i));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(i)));
+ }
+
+ // Serialize Bloom filter to memory output stream
+ InMemoryOutputStream sink;
+ bloom_filter.WriteTo(&sink);
+
+ // Deserialize Bloom filter from memory
+ InMemoryInputStream source(sink.GetBuffer());
+
+ BlockSplitBloomFilter de_bloom = std::move(BlockSplitBloomFilter::Deserialize(&source));
+
+ for (int i = 0; i < 10; i++) {
+ EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(i)));
+ }
+}
+
+// Helper function to generate random string.
+std::string GetRandomString(uint32_t length) {
+ // Character set used to generate random string
+ const std::string charset =
+ "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+
+ // The uuid_seed was generated by "uuidgen -r"
+ const std::string uuid_seed = "8de406aa-fb59-4195-a81c-5152af26433f";
+ std::seed_seq seed(uuid_seed.begin(), uuid_seed.end());
+ std::mt19937 generator(seed);
+ std::uniform_int_distribution<uint32_t> dist(0, static_cast<int>(charset.size() - 1));
+ std::string ret = "";
+
+ for (uint32_t i = 0; i < length; i++) {
+ ret += charset[dist(generator)];
+ }
+
+ return ret;
+}
+
+TEST(FPPTest, TestBloomFilter) {
+ // It counts the number of times FindHash returns true.
+ int exist = 0;
+
+ // Total count of elements that will be used
+ const int total_count = 100000;
+
+ // Bloom filter fpp parameter
+ const double fpp = 0.01;
+
+ std::vector<std::string> members;
+ BlockSplitBloomFilter bloom_filter;
+ bloom_filter.Init(BlockSplitBloomFilter::OptimalNumOfBits(total_count, fpp));
+
+ // Insert elements into the Bloom filter
+ for (int i = 0; i < total_count; i++) {
+ // Insert random string which length is 8
+ std::string tmp = GetRandomString(8);
+ const ByteArray byte_array(8, reinterpret_cast<const uint8_t*>(tmp.c_str()));
+ members.push_back(tmp);
+ bloom_filter.InsertHash(bloom_filter.Hash(&byte_array));
+ }
+
+ for (int i = 0; i < total_count; i++) {
+ const ByteArray byte_array1(8, reinterpret_cast<const uint8_t*>(members[i].c_str()));
+ ASSERT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(&byte_array1)));
+ std::string tmp = GetRandomString(7);
+ const ByteArray byte_array2(7, reinterpret_cast<const uint8_t*>(tmp.c_str()));
+
+ if (bloom_filter.FindHash(bloom_filter.Hash(&byte_array2))) {
+ exist++;
+ }
+ }
+
+ // The exist should be probably less than 1000 according default FPP 0.01.
+ EXPECT_TRUE(exist < total_count * fpp);
+}
+
+// The CompatibilityTest is used to test cross compatibility with parquet-mr, it reads
+// the Bloom filter binary generated by the Bloom filter class in the parquet-mr project
+// and tests whether the values inserted before could be filtered or not.
+
+// The Bloom filter binary is generated by three steps in from Parquet-mr.
+// Step 1: Construct a Bloom filter with 1024 bytes bitset.
+// Step 2: Insert "hello", "parquet", "bloom", "filter" to Bloom filter.
+// Step 3: Call writeTo API to write to File.
+TEST(CompatibilityTest, TestBloomFilter) {
+ const std::string test_string[4] = {"hello", "parquet", "bloom", "filter"};
+ const std::string bloom_filter_test_binary =
+ std::string(test::get_data_dir()) + "/bloom_filter.bin";
+ std::shared_ptr<::arrow::io::ReadableFile> handle;
+
+ int64_t size;
+ PARQUET_THROW_NOT_OK(
+ ::arrow::io::ReadableFile::Open(bloom_filter_test_binary, &handle));
+ PARQUET_THROW_NOT_OK(handle->GetSize(&size));
+
+ // 1024 bytes (bitset) + 4 bytes (hash) + 4 bytes (algorithm) + 4 bytes (length)
+ EXPECT_EQ(size, 1036);
+
+ std::unique_ptr<uint8_t[]> bitset(new uint8_t[size]());
+ std::shared_ptr<Buffer> buffer(new Buffer(bitset.get(), size));
+ handle->Read(size, &buffer);
+
+ InMemoryInputStream source(buffer);
+ BlockSplitBloomFilter bloom_filter1 =
+ std::move(BlockSplitBloomFilter::Deserialize(&source));
+
+ for (int i = 0; i < 4; i++) {
+ const ByteArray tmp(static_cast<uint32_t>(test_string[i].length()),
+ reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
+ EXPECT_TRUE(bloom_filter1.FindHash(bloom_filter1.Hash(&tmp)));
+ }
+
+ // The following is used to check whether the new created Bloom filter in parquet-cpp is
+ // byte-for-byte identical to file at bloom_data_path which is created from parquet-mr
+ // with same inserted hashes.
+ BlockSplitBloomFilter bloom_filter2;
+ bloom_filter2.Init(bloom_filter1.GetBitsetSize());
+ for (int i = 0; i < 4; i++) {
+ const ByteArray byte_array(static_cast<uint32_t>(test_string[i].length()),
+ reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
+ bloom_filter2.InsertHash(bloom_filter2.Hash(&byte_array));
+ }
+
+ // Serialize Bloom filter to memory output stream
+ InMemoryOutputStream sink;
+ bloom_filter2.WriteTo(&sink);
+ std::shared_ptr<Buffer> buffer1 = sink.GetBuffer();
+
+ handle->Seek(0);
+ handle->GetSize(&size);
+ std::shared_ptr<Buffer> buffer2;
+ handle->Read(size, &buffer2);
+
+ EXPECT_TRUE((*buffer1).Equals(*buffer2));
+}
+
+// OptmialValueTest is used to test whether OptimalNumOfBits returns expected
+// numbers according to formula:
+// num_of_bits = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8.0))
+// where ndv is the number of distinct values and fpp is the false positive probability.
+// Also it is used to test whether OptimalNumOfBits returns value between
+// [MINIMUM_BLOOM_FILTER_SIZE, MAXIMUM_BLOOM_FILTER_SIZE].
+TEST(OptimalValueTest, TestBloomFilter) {
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(256, 0.01), UINT32_C(4096));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(512, 0.01), UINT32_C(8192));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1024, 0.01), UINT32_C(16384));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(2048, 0.01), UINT32_C(32768));
+
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.01), UINT32_C(2048));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.01), UINT32_C(4096));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.01), UINT32_C(8192));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.01), UINT32_C(16384));
+
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.025), UINT32_C(2048));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.025), UINT32_C(4096));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.025), UINT32_C(8192));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.025), UINT32_C(16384));
+
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.05), UINT32_C(2048));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.05), UINT32_C(4096));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.05), UINT32_C(8192));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.05), UINT32_C(16384));
+
+ // Boundary check
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.01), UINT32_C(256));
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.25), UINT32_C(256));
+
+ EXPECT_EQ(
+ BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.01),
+ UINT32_C(1073741824));
+ EXPECT_EQ(
+ BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.25),
+ UINT32_C(1073741824));
+}
+
+} // namespace test
+
+} // namespace parquet
diff --git a/src/parquet/bloom_filter.cc b/src/parquet/bloom_filter.cc
new file mode 100644
index 0000000..faa344c
--- /dev/null
+++ b/src/parquet/bloom_filter.cc
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cmath>
+#include <cstdint>
+
+#include "arrow/status.h"
+#include "arrow/util/bit-util.h"
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/murmur3.h"
+#include "parquet/types.h"
+#include "parquet/util/logging.h"
+
+namespace parquet {
+constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock];
+
+BlockSplitBloomFilter::BlockSplitBloomFilter()
+ : pool_(::arrow::default_memory_pool()),
+ hash_strategy_(HashStrategy::MURMUR3_X64_128),
+ algorithm_(Algorithm::BLOCK) {}
+
+void BlockSplitBloomFilter::Init(uint32_t num_bytes) {
+ if (num_bytes < kMinimumBloomFilterBytes) {
+ num_bytes = kMinimumBloomFilterBytes;
+ }
+
+ // Get next power of 2 if it is not power of 2.
+ if ((num_bytes & (num_bytes - 1)) != 0) {
+ num_bytes = static_cast<uint32_t>(::arrow::BitUtil::NextPower2(num_bytes));
+ }
+
+ if (num_bytes > kMaximumBloomFilterBytes) {
+ num_bytes = kMaximumBloomFilterBytes;
+ }
+
+ num_bytes_ = num_bytes;
+ PARQUET_THROW_NOT_OK(::arrow::AllocateBuffer(pool_, num_bytes_, &data_));
+ memset(data_->mutable_data(), 0, num_bytes_);
+
+ this->hasher_.reset(new MurmurHash3());
+}
+
+void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
+ DCHECK(bitset != nullptr);
+
+ if (num_bytes < kMinimumBloomFilterBytes || num_bytes > kMaximumBloomFilterBytes ||
+ (num_bytes & (num_bytes - 1)) != 0) {
+ throw ParquetException("Given length of bitset is illegal");
+ }
+
+ num_bytes_ = num_bytes;
+ PARQUET_THROW_NOT_OK(::arrow::AllocateBuffer(pool_, num_bytes_, &data_));
+ memcpy(data_->mutable_data(), bitset, num_bytes_);
+
+ this->hasher_.reset(new MurmurHash3());
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(InputStream* input) {
+ int64_t bytes_available;
+
+ const uint8_t* read_buffer = NULL;
+ read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
+ if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
+ throw ParquetException("Failed to deserialize from input stream");
+ }
+ uint32_t len;
+ memcpy(&len, read_buffer, sizeof(uint32_t));
+
+ read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
+ if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
+ throw ParquetException("Failed to deserialize from input stream");
+ }
+ uint32_t hash;
+ memcpy(&hash, read_buffer, sizeof(uint32_t));
+ if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
+ throw ParquetException("Unsupported hash strategy");
+ }
+
+ read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
+ if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
+ throw ParquetException("Failed to deserialize from input stream");
+ }
+ uint32_t algorithm;
+ memcpy(&algorithm, read_buffer, sizeof(uint32_t));
+ if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) {
+ throw ParquetException("Unsupported Bloom filter algorithm");
+ }
+
+ BlockSplitBloomFilter bloom_filter;
+ bloom_filter.Init(input->Read(len, &bytes_available), len);
+ return bloom_filter;
+}
+
+void BlockSplitBloomFilter::WriteTo(OutputStream* sink) const {
+ DCHECK(sink != nullptr);
+
+ sink->Write(reinterpret_cast<const uint8_t*>(&num_bytes_), sizeof(num_bytes_));
+ sink->Write(reinterpret_cast<const uint8_t*>(&hash_strategy_), sizeof(hash_strategy_));
+ sink->Write(reinterpret_cast<const uint8_t*>(&algorithm_), sizeof(algorithm_));
+ sink->Write(data_->mutable_data(), num_bytes_);
+}
+
+void BlockSplitBloomFilter::SetMask(uint32_t key, BlockMask& block_mask) const {
+ for (int i = 0; i < kBitsSetPerBlock; ++i) {
+ block_mask.item[i] = key * SALT[i];
+ }
+
+ for (int i = 0; i < kBitsSetPerBlock; ++i) {
+ block_mask.item[i] = block_mask.item[i] >> 27;
+ }
+
+ for (int i = 0; i < kBitsSetPerBlock; ++i) {
+ block_mask.item[i] = UINT32_C(0x1) << block_mask.item[i];
+ }
+}
+
+bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
+ const uint32_t bucket_index =
+ static_cast<uint32_t>((hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1));
+ uint32_t key = static_cast<uint32_t>(hash);
+ uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
+
+ // Calculate mask for bucket.
+ BlockMask block_mask;
+ SetMask(key, block_mask);
+
+ for (int i = 0; i < kBitsSetPerBlock; ++i) {
+ if (0 == (bitset32[kBitsSetPerBlock * bucket_index + i] & block_mask.item[i])) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void BlockSplitBloomFilter::InsertHash(uint64_t hash) {
+ const uint32_t bucket_index =
+ static_cast<uint32_t>(hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1);
+ uint32_t key = static_cast<uint32_t>(hash);
+ uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
+
+ // Calculate mask for bucket.
+ BlockMask block_mask;
+ SetMask(key, block_mask);
+
+ for (int i = 0; i < kBitsSetPerBlock; i++) {
+ bitset32[bucket_index * kBitsSetPerBlock + i] |= block_mask.item[i];
+ }
+}
+
+} // namespace parquet
diff --git a/src/parquet/bloom_filter.h b/src/parquet/bloom_filter.h
new file mode 100644
index 0000000..e39370a
--- /dev/null
+++ b/src/parquet/bloom_filter.h
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_BLOOM_FILTER_H
+#define PARQUET_BLOOM_FILTER_H
+
+#include <cstdint>
+
+#include "parquet/exception.h"
+#include "parquet/hasher.h"
+#include "parquet/types.h"
+#include "parquet/util/logging.h"
+#include "parquet/util/memory.h"
+
+namespace parquet {
+class OutputStream;
+
+// A Bloom filter is a compact structure to indicate whether an item is not in a set or
+// probably in a set. The Bloom filter usually consists of a bit set that represents a
+// set of elements, a hash strategy and a Bloom filter algorithm.
+class BloomFilter {
+ public:
+ // Maximum Bloom filter size, it sets to HDFS default block size 128MB
+ // This value will be reconsidered when implementing Bloom filter producer.
+ static constexpr uint32_t kMaximumBloomFilterBytes = 128 * 1024 * 1024;
+
+ /// Determine whether an element exist in set or not.
+ ///
+ /// @param hash the element to contain.
+ /// @return false if value is definitely not in set, and true means PROBABLY
+ /// in set.
+ virtual bool FindHash(uint64_t hash) const = 0;
+
+ /// Insert element to set represented by Bloom filter bitset.
+ /// @param hash the hash of value to insert into Bloom filter.
+ virtual void InsertHash(uint64_t hash) = 0;
+
+ /// Write this Bloom filter to an output stream. A Bloom filter structure should
+ /// include bitset length, hash strategy, algorithm, and bitset.
+ ///
+ /// @param sink the output stream to write
+ virtual void WriteTo(OutputStream* sink) const = 0;
+
+ /// Get the number of bytes of bitset
+ virtual uint32_t GetBitsetSize() const = 0;
+
+ /// Compute hash for 32 bits value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(int32_t value) const = 0;
+
+ /// Compute hash for 64 bits value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(int64_t value) const = 0;
+
+ /// Compute hash for float value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(float value) const = 0;
+
+ /// Compute hash for double value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(double value) const = 0;
+
+ /// Compute hash for Int96 value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(const Int96* value) const = 0;
+
+ /// Compute hash for ByteArray value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(const ByteArray* value) const = 0;
+
+ /// Compute hash for fixed byte array value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;
+
+ virtual ~BloomFilter() {}
+
+ protected:
+ // Hash strategy available for Bloom filter.
+ enum class HashStrategy : uint32_t { MURMUR3_X64_128 = 0 };
+
+ // Bloom filter algorithm.
+ enum class Algorithm : uint32_t { BLOCK = 0 };
+};
+
+// The BlockSplitBloomFilter is implemented using block-based Bloom filters from
+// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic idea is to
+// hash the item to a tiny Bloom filter which size fit a single cache line or smaller.
+//
+// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom
+// filter is 32 bytes to take advantage of 32-byte SIMD instructions.
+class BlockSplitBloomFilter : public BloomFilter {
+ public:
+ /// The constructor of BlockSplitBloomFilter. It uses murmur3_x64_128 as hash function.
+ BlockSplitBloomFilter();
+
+ /// Initialize the BlockSplitBloomFilter. The range of num_bytes should be within
+ /// [kMinimumBloomFilterBytes, kMaximumBloomFilterBytes], it will be
+ /// rounded up/down to lower/upper bound if num_bytes is out of range and also
+ /// will be rounded up to a power of 2.
+ ///
+ /// @param num_bytes The number of bytes to store Bloom filter bitset.
+ void Init(uint32_t num_bytes);
+
+ /// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying
+ /// bitset because the given bitset may not satisfy the 32-byte alignment requirement
+ /// which may lead to segfault when performing SIMD instructions. It is the caller's
+ /// responsibility to free the bitset passed in. This is used when reconstructing
+ /// a Bloom filter from a parquet file.
+ ///
+ /// @param bitset The given bitset to initialize the Bloom filter.
+ /// @param num_bytes The number of bytes of given bitset.
+ void Init(const uint8_t* bitset, uint32_t num_bytes);
+
+ // Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter.
+ static constexpr uint32_t kMinimumBloomFilterBytes = 32;
+
+ /// Calculate optimal size according to the number of distinct values and false
+ /// positive probability.
+ ///
+ /// @param ndv The number of distinct values.
+ /// @param fpp The false positive probability.
+ /// @return it always return a value between kMinimumBloomFilterBytes and
+ /// kMaximumBloomFilterBytes, and the return value is always a power of 2
+ static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) {
+ DCHECK(fpp > 0.0 && fpp < 1.0);
+ const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8));
+ uint32_t num_bits = static_cast<uint32_t>(m);
+
+ // Handle overflow.
+ if (m < 0 || m > kMaximumBloomFilterBytes << 3) {
+ num_bits = static_cast<uint32_t>(kMaximumBloomFilterBytes << 3);
+ }
+
+ // Round up to lower bound
+ if (num_bits < kMinimumBloomFilterBytes << 3) {
+ num_bits = kMinimumBloomFilterBytes << 3;
+ }
+
+ // Get next power of 2 if bits is not power of 2.
+ if ((num_bits & (num_bits - 1)) != 0) {
+ num_bits = static_cast<uint32_t>(::arrow::BitUtil::NextPower2(num_bits));
+ }
+
+ // Round down to upper bound
+ if (num_bits > kMaximumBloomFilterBytes << 3) {
+ num_bits = kMaximumBloomFilterBytes << 3;
+ }
+
+ return num_bits;
+ }
+
+ bool FindHash(uint64_t hash) const override;
+ void InsertHash(uint64_t hash) override;
+ void WriteTo(OutputStream* sink) const override;
+ uint32_t GetBitsetSize() const override { return num_bytes_; }
+ uint64_t Hash(int64_t value) const override { return hasher_->Hash(value); }
+ uint64_t Hash(float value) const override { return hasher_->Hash(value); }
+ uint64_t Hash(double value) const override { return hasher_->Hash(value); }
+ uint64_t Hash(const Int96* value) const override { return hasher_->Hash(value); }
+ uint64_t Hash(const ByteArray* value) const override { return hasher_->Hash(value); }
+ uint64_t Hash(int32_t value) const override { return hasher_->Hash(value); }
+ uint64_t Hash(const FLBA* value, uint32_t len) const override {
+ return hasher_->Hash(value, len);
+ }
+ /// Deserialize the Bloom filter from an input stream. It is used when reconstructing
+ /// a Bloom filter from a parquet filter.
+ ///
+ /// @param input_stream The input stream from which to construct the Bloom filter
+ /// @return The BlockSplitBloomFilter.
+ static BlockSplitBloomFilter Deserialize(InputStream* input_stream);
+
+ private:
+ // Bytes in a tiny Bloom filter block.
+ static constexpr int kBytesPerFilterBlock = 32;
+
+ // The number of bits to be set in each tiny Bloom filter
+ static constexpr int kBitsSetPerBlock = 8;
+
+ // A mask structure used to set bits in each tiny Bloom filter.
+ struct BlockMask {
+ uint32_t item[kBitsSetPerBlock];
+ };
+
+ // The block-based algorithm needs eight odd SALT values to calculate eight indexes
+ // of bit to set, one bit in each 32-bit word.
+ static constexpr uint32_t SALT[kBitsSetPerBlock] = {
+ 0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
+ 0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U};
+
+ /// Set bits in mask array according to input key.
+ /// @param key the value to calculate mask values.
+ /// @param mask the mask array is used to set inside a block
+ void SetMask(uint32_t key, BlockMask& mask) const;
+
+ // Memory pool to allocate aligned buffer for bitset
+ ::arrow::MemoryPool* pool_;
+
+ // The underlying buffer of bitset.
+ std::shared_ptr<Buffer> data_;
+
+ // The number of bytes of Bloom filter bitset.
+ uint32_t num_bytes_;
+
+ // Hash strategy used in this Bloom filter.
+ HashStrategy hash_strategy_;
+
+ // Algorithm used in this Bloom filter.
+ Algorithm algorithm_;
+
+ // The hash pointer points to actual hash class used.
+ std::unique_ptr<Hasher> hasher_;
+};
+
+} // namespace parquet
+
+#endif // PARQUET_BLOOM_FILTER_H
diff --git a/src/parquet/hasher.h b/src/parquet/hasher.h
new file mode 100644
index 0000000..dc316a0
--- /dev/null
+++ b/src/parquet/hasher.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_HASHER_H
+#define PARQUET_HASHER_H
+
+#include <cstdint>
+#include "parquet/types.h"
+
+namespace parquet {
+// Abstract class for hash
+class Hasher {
+ public:
+ /// Compute hash for 32 bits value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(int32_t value) const = 0;
+
+ /// Compute hash for 64 bits value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(int64_t value) const = 0;
+
+ /// Compute hash for float value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(float value) const = 0;
+
+ /// Compute hash for double value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(double value) const = 0;
+
+ /// Compute hash for Int96 value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(const Int96* value) const = 0;
+
+ /// Compute hash for ByteArray value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(const ByteArray* value) const = 0;
+
+ /// Compute hash for fixed byte array value by using its plain encoding result.
+ ///
+ /// @param value the value to hash.
+ /// @return hash result.
+ virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;
+
+ virtual ~Hasher() = default;
+};
+
+} // namespace parquet
+
+#endif // PARQUET_HASHER_H
diff --git a/src/parquet/murmur3.cc b/src/parquet/murmur3.cc
new file mode 100644
index 0000000..a19436e
--- /dev/null
+++ b/src/parquet/murmur3.cc
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//-----------------------------------------------------------------------------
+// MurmurHash3 was written by Austin Appleby, and is placed in the public
+// domain. The author hereby disclaims copyright to this source code.
+
+// Note - The x86 and x64 versions do _not_ produce the same results, as the
+// algorithms are optimized for their respective platforms. You can still
+// compile and run any of them on any platform, but your performance with the
+// non-native version will be less than optimal.
+
+#include "parquet/murmur3.h"
+
+namespace parquet {
+
+#if defined(_MSC_VER)
+
+#define FORCE_INLINE __forceinline
+#define ROTL64(x, y) _rotl64(x, y)
+
+#else // defined(_MSC_VER)
+
+#define FORCE_INLINE inline __attribute__((always_inline))
+inline uint64_t rotl64(uint64_t x, int8_t r) { return (x << r) | (x >> (64 - r)); }
+#define ROTL64(x, y) rotl64(x, y)
+
+#endif // !defined(_MSC_VER)
+
+#define BIG_CONSTANT(x) (x##LLU)
+
+//-----------------------------------------------------------------------------
+// Block read - if your platform needs to do endian-swapping or can only
+// handle aligned reads, do the conversion here
+
+FORCE_INLINE uint32_t getblock32(const uint32_t* p, int i) { return p[i]; }
+
+FORCE_INLINE uint64_t getblock64(const uint64_t* p, int i) { return p[i]; }
+
+//-----------------------------------------------------------------------------
+// Finalization mix - force all bits of a hash block to avalanche
+
+FORCE_INLINE uint32_t fmix32(uint32_t h) {
+ h ^= h >> 16;
+ h *= 0x85ebca6b;
+ h ^= h >> 13;
+ h *= 0xc2b2ae35;
+ h ^= h >> 16;
+
+ return h;
+}
+
+//----------
+
+FORCE_INLINE uint64_t fmix64(uint64_t k) {
+ k ^= k >> 33;
+ k *= BIG_CONSTANT(0xff51afd7ed558ccd);
+ k ^= k >> 33;
+ k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53);
+ k ^= k >> 33;
+
+ return k;
+}
+
+//-----------------------------------------------------------------------------
+
+void Hash_x64_128(const void* key, const int len, const uint32_t seed, uint64_t out[2]) {
+ const uint8_t* data = (const uint8_t*)key;
+ const int nblocks = len / 16;
+
+ uint64_t h1 = seed;
+ uint64_t h2 = seed;
+
+ const uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5);
+ const uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f);
+
+ //----------
+ // body
+
+ const uint64_t* blocks = (const uint64_t*)(data);
+
+ for (int i = 0; i < nblocks; i++) {
+ uint64_t k1 = getblock64(blocks, i * 2 + 0);
+ uint64_t k2 = getblock64(blocks, i * 2 + 1);
+
+ k1 *= c1;
+ k1 = ROTL64(k1, 31);
+ k1 *= c2;
+ h1 ^= k1;
+
+ h1 = ROTL64(h1, 27);
+ h1 += h2;
+ h1 = h1 * 5 + 0x52dce729;
+
+ k2 *= c2;
+ k2 = ROTL64(k2, 33);
+ k2 *= c1;
+ h2 ^= k2;
+
+ h2 = ROTL64(h2, 31);
+ h2 += h1;
+ h2 = h2 * 5 + 0x38495ab5;
+ }
+
+ //----------
+ // tail
+
+ const uint8_t* tail = (const uint8_t*)(data + nblocks * 16);
+
+ uint64_t k1 = 0;
+ uint64_t k2 = 0;
+
+ switch (len & 15) {
+ case 15:
+ k2 ^= ((uint64_t)tail[14]) << 48;
+ case 14:
+ k2 ^= ((uint64_t)tail[13]) << 40;
+ case 13:
+ k2 ^= ((uint64_t)tail[12]) << 32;
+ case 12:
+ k2 ^= ((uint64_t)tail[11]) << 24;
+ case 11:
+ k2 ^= ((uint64_t)tail[10]) << 16;
+ case 10:
+ k2 ^= ((uint64_t)tail[9]) << 8;
+ case 9:
+ k2 ^= ((uint64_t)tail[8]) << 0;
+ k2 *= c2;
+ k2 = ROTL64(k2, 33);
+ k2 *= c1;
+ h2 ^= k2;
+
+ case 8:
+ k1 ^= ((uint64_t)tail[7]) << 56;
+ case 7:
+ k1 ^= ((uint64_t)tail[6]) << 48;
+ case 6:
+ k1 ^= ((uint64_t)tail[5]) << 40;
+ case 5:
+ k1 ^= ((uint64_t)tail[4]) << 32;
+ case 4:
+ k1 ^= ((uint64_t)tail[3]) << 24;
+ case 3:
+ k1 ^= ((uint64_t)tail[2]) << 16;
+ case 2:
+ k1 ^= ((uint64_t)tail[1]) << 8;
+ case 1:
+ k1 ^= ((uint64_t)tail[0]) << 0;
+ k1 *= c1;
+ k1 = ROTL64(k1, 31);
+ k1 *= c2;
+ h1 ^= k1;
+ }
+
+ //----------
+ // finalization
+
+ h1 ^= len;
+ h2 ^= len;
+
+ h1 += h2;
+ h2 += h1;
+
+ h1 = fmix64(h1);
+ h2 = fmix64(h2);
+
+ h1 += h2;
+ h2 += h1;
+
+ reinterpret_cast<uint64_t*>(out)[0] = h1;
+ reinterpret_cast<uint64_t*>(out)[1] = h2;
+}
+
+template <typename T>
+uint64_t HashHelper(T value, uint32_t seed) {
+ uint64_t output[2];
+ Hash_x64_128(reinterpret_cast<void*>(&value), sizeof(T), seed, output);
+ return output[0];
+}
+
+uint64_t MurmurHash3::Hash(int32_t value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(int64_t value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(float value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(double value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(const FLBA* value, uint32_t len) const {
+ uint64_t out[2];
+ Hash_x64_128(reinterpret_cast<const void*>(value->ptr), len, seed_, out);
+ return out[0];
+}
+
+uint64_t MurmurHash3::Hash(const Int96* value) const {
+ uint64_t out[2];
+ Hash_x64_128(reinterpret_cast<const void*>(value->value), sizeof(value->value), seed_,
+ out);
+ return out[0];
+}
+
+uint64_t MurmurHash3::Hash(const ByteArray* value) const {
+ uint64_t out[2];
+ Hash_x64_128(reinterpret_cast<const void*>(value->ptr), value->len, seed_, out);
+ return out[0];
+}
+
+} // namespace parquet
diff --git a/src/parquet/murmur3.h b/src/parquet/murmur3.h
new file mode 100644
index 0000000..84792f3
--- /dev/null
+++ b/src/parquet/murmur3.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//-----------------------------------------------------------------------------
+// MurmurHash3 was written by Austin Appleby, and is placed in the public
+// domain. The author hereby disclaims copyright to this source code.
+
+#ifndef PARQUET_MURMURHASH3_H_
+#define PARQUET_MURMURHASH3_H_
+
+#include <cstdint>
+
+#include "parquet/hasher.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// Source:
+/// https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp
+/// (Modified to adapt to coding conventions and to inherit the Hasher abstract class)
+class MurmurHash3 : public Hasher {
+ public:
+ MurmurHash3() : seed_(DEFAULT_SEED) {}
+ uint64_t Hash(int32_t value) const override;
+ uint64_t Hash(int64_t value) const override;
+ uint64_t Hash(float value) const override;
+ uint64_t Hash(double value) const override;
+ uint64_t Hash(const Int96* value) const override;
+ uint64_t Hash(const ByteArray* value) const override;
+ uint64_t Hash(const FLBA* val, uint32_t len) const override;
+
+ private:
+ // Default seed for hash which comes from Bloom filter in parquet-mr, it is generated
+ // by System.nanoTime() of java.
+ static constexpr int DEFAULT_SEED = 1361930890;
+
+ uint32_t seed_;
+};
+
+} // namespace parquet
+
+#endif // PARQUET_MURMURHASH3_H_