PARQUET-1332: Add bloom filter for parquet

This is first part of bloom filter patch set, which include a bloom filter utility and also some unit tests.
Note that this patch also includes murmur3Hash original code from Austin Appleby. The code isn't formatted as parquet-cpp format.

Author: Chen, Junjie <cjjnjust@gmail.com>

Closes #432 from cjjnjust/master and squashes the following commits:

d4d3018 [Chen, Junjie] PARQUET-1332: update constructor
b0f3f80 [Chen, Junjie] PARQUET-1332: update code for some coding style
fe97b44 [Chen, Junjie] PARQUET-1332: refine the complex classes
fd3ba23 [Chen, Junjie] PARQUET-1332: fix build error for clang Xcode
019322a [Chen, Junjie] PARQUET-1332: refine bloom filter algorithm
ec6a6e9 [Chen, Junjie] PARQUET-1332: fix build failure for windows platform
0c1e95f [Chen, Junjie] PARQUET-1332: update according to latest comments
1a105fc [Chen, Junjie] PARQUET-1332: update code according to Jim's comments
731191c [Chen, Junjie] PARQUET-1332: rebase to latest to solve CI error and update comments
0bc8595 [Chen, Junjie] PARQUET-1332: use abstract class for hash class
1374665 [Chen, Junjie] PARQUET-1332: update according to code review
858b2ce [Chen, Junjie] PARQUET-1332: remove unnecessary const_cast
23d7ccf [Chen, Junjie] PARQUET-1332: add missing file
f2ff8ad [Chen, Junjie] PARQUET-1332: Add bloom filter utility class
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 927f728..5b3c460 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -674,6 +674,7 @@
   src/parquet/arrow/record_reader.cc
   src/parquet/arrow/schema.cc
   src/parquet/arrow/writer.cc
+  src/parquet/bloom_filter.cc
   src/parquet/column_reader.cc
   src/parquet/column_scanner.cc
   src/parquet/column_writer.cc
@@ -681,6 +682,7 @@
   src/parquet/file_reader.cc
   src/parquet/file_writer.cc
   src/parquet/metadata.cc
+  src/parquet/murmur3.cc
   src/parquet/parquet_constants.cpp
   src/parquet/parquet_types.cpp
   src/parquet/printer.cc
diff --git a/data/bloom_filter.bin b/data/bloom_filter.bin
new file mode 100644
index 0000000..c0e30ce
--- /dev/null
+++ b/data/bloom_filter.bin
Binary files differ
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index bc16d8b..93a242c 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -17,6 +17,7 @@
 
 # Headers: top level
 install(FILES
+  bloom_filter.h
   column_reader.h
   column_page.h
   column_scanner.h
@@ -25,7 +26,9 @@
   exception.h
   file_reader.h
   file_writer.h
+  hasher.h
   metadata.h
+  murmur3.h
   printer.h
   properties.h
   schema.h
@@ -50,6 +53,7 @@
   "${CMAKE_CURRENT_BINARY_DIR}/parquet.pc"
   DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
 
+ADD_PARQUET_TEST(bloom_filter-test)
 ADD_PARQUET_TEST(column_reader-test)
 ADD_PARQUET_TEST(column_scanner-test)
 ADD_PARQUET_TEST(column_writer-test)
diff --git a/src/parquet/README b/src/parquet/README
new file mode 100644
index 0000000..fc16a46
--- /dev/null
+++ b/src/parquet/README
@@ -0,0 +1,10 @@
+The CompatibilityTest of bloom_filter-test.cc is used to test cross compatibility of
+Bloom filters between parquet-mr and parquet-cpp. It reads the Bloom filter binary
+generated by the Bloom filter class in the parquet-mr project and tests whether the
+values inserted before could be filtered or not.
+
+The Bloom filter binary is generated by three steps from Parquet-mr:
+Step 1: Construct a Bloom filter with 1024 bytes of bitset.
+Step 2: Insert hashes of "hello", "parquet", "bloom", "filter" strings to Bloom filter
+by calling hash and insert APIs.
+Step 3: Call writeTo API to write to File.
diff --git a/src/parquet/bloom_filter-test.cc b/src/parquet/bloom_filter-test.cc
new file mode 100644
index 0000000..dbef8c8
--- /dev/null
+++ b/src/parquet/bloom_filter-test.cc
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <random>
+#include <string>
+
+#include "arrow/io/file.h"
+#include "parquet/bloom_filter.h"
+#include "parquet/murmur3.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/test-common.h"
+
+namespace parquet {
+namespace test {
+TEST(Murmur3Test, TestBloomFilter) {
+  uint64_t result;
+  const uint8_t bitset[8] = {0x0, 0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7};
+  ByteArray byteArray(8, bitset);
+  MurmurHash3 murmur3;
+  result = murmur3.Hash(&byteArray);
+  EXPECT_EQ(result, UINT64_C(913737700387071329));
+}
+
+TEST(ConstructorTest, TestBloomFilter) {
+  BlockSplitBloomFilter bloom_filter;
+  EXPECT_NO_THROW(bloom_filter.Init(1000));
+
+  // It throws because the length cannot be zero
+  std::unique_ptr<uint8_t[]> bitset1(new uint8_t[1024]());
+  EXPECT_THROW(bloom_filter.Init(bitset1.get(), 0), ParquetException);
+
+  // It throws because the number of bytes of Bloom filter bitset must be a power of 2.
+  std::unique_ptr<uint8_t[]> bitset2(new uint8_t[1024]());
+  EXPECT_THROW(bloom_filter.Init(bitset2.get(), 1023), ParquetException);
+}
+
+// The BasicTest is used to test basic operations including InsertHash, FindHash and
+// serializing and de-serializing.
+TEST(BasicTest, TestBloomFilter) {
+  BlockSplitBloomFilter bloom_filter;
+  bloom_filter.Init(1024);
+
+  for (int i = 0; i < 10; i++) {
+    bloom_filter.InsertHash(bloom_filter.Hash(i));
+  }
+
+  for (int i = 0; i < 10; i++) {
+    EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(i)));
+  }
+
+  // Serialize Bloom filter to memory output stream
+  InMemoryOutputStream sink;
+  bloom_filter.WriteTo(&sink);
+
+  // Deserialize Bloom filter from memory
+  InMemoryInputStream source(sink.GetBuffer());
+
+  BlockSplitBloomFilter de_bloom = std::move(BlockSplitBloomFilter::Deserialize(&source));
+
+  for (int i = 0; i < 10; i++) {
+    EXPECT_TRUE(de_bloom.FindHash(de_bloom.Hash(i)));
+  }
+}
+
+// Helper function to generate random string.
+std::string GetRandomString(uint32_t length) {
+  // Character set used to generate random string
+  const std::string charset =
+      "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+
+  // The uuid_seed was generated by "uuidgen -r"
+  const std::string uuid_seed = "8de406aa-fb59-4195-a81c-5152af26433f";
+  std::seed_seq seed(uuid_seed.begin(), uuid_seed.end());
+  std::mt19937 generator(seed);
+  std::uniform_int_distribution<uint32_t> dist(0, static_cast<int>(charset.size() - 1));
+  std::string ret = "";
+
+  for (uint32_t i = 0; i < length; i++) {
+    ret += charset[dist(generator)];
+  }
+
+  return ret;
+}
+
+TEST(FPPTest, TestBloomFilter) {
+  // It counts the number of times FindHash returns true.
+  int exist = 0;
+
+  // Total count of elements that will be used
+  const int total_count = 100000;
+
+  // Bloom filter fpp parameter
+  const double fpp = 0.01;
+
+  std::vector<std::string> members;
+  BlockSplitBloomFilter bloom_filter;
+  bloom_filter.Init(BlockSplitBloomFilter::OptimalNumOfBits(total_count, fpp));
+
+  // Insert elements into the Bloom filter
+  for (int i = 0; i < total_count; i++) {
+    // Insert random string which length is 8
+    std::string tmp = GetRandomString(8);
+    const ByteArray byte_array(8, reinterpret_cast<const uint8_t*>(tmp.c_str()));
+    members.push_back(tmp);
+    bloom_filter.InsertHash(bloom_filter.Hash(&byte_array));
+  }
+
+  for (int i = 0; i < total_count; i++) {
+    const ByteArray byte_array1(8, reinterpret_cast<const uint8_t*>(members[i].c_str()));
+    ASSERT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(&byte_array1)));
+    std::string tmp = GetRandomString(7);
+    const ByteArray byte_array2(7, reinterpret_cast<const uint8_t*>(tmp.c_str()));
+
+    if (bloom_filter.FindHash(bloom_filter.Hash(&byte_array2))) {
+      exist++;
+    }
+  }
+
+  // The exist should be probably less than 1000 according default FPP 0.01.
+  EXPECT_TRUE(exist < total_count * fpp);
+}
+
+// The CompatibilityTest is used to test cross compatibility with parquet-mr, it reads
+// the Bloom filter binary generated by the Bloom filter class in the parquet-mr project
+// and tests whether the values inserted before could be filtered or not.
+
+// The Bloom filter binary is generated by three steps in from Parquet-mr.
+// Step 1: Construct a Bloom filter with 1024 bytes bitset.
+// Step 2: Insert "hello", "parquet", "bloom", "filter" to Bloom filter.
+// Step 3: Call writeTo API to write to File.
+TEST(CompatibilityTest, TestBloomFilter) {
+  const std::string test_string[4] = {"hello", "parquet", "bloom", "filter"};
+  const std::string bloom_filter_test_binary =
+      std::string(test::get_data_dir()) + "/bloom_filter.bin";
+  std::shared_ptr<::arrow::io::ReadableFile> handle;
+
+  int64_t size;
+  PARQUET_THROW_NOT_OK(
+      ::arrow::io::ReadableFile::Open(bloom_filter_test_binary, &handle));
+  PARQUET_THROW_NOT_OK(handle->GetSize(&size));
+
+  // 1024 bytes (bitset) + 4 bytes (hash) + 4 bytes (algorithm) + 4 bytes (length)
+  EXPECT_EQ(size, 1036);
+
+  std::unique_ptr<uint8_t[]> bitset(new uint8_t[size]());
+  std::shared_ptr<Buffer> buffer(new Buffer(bitset.get(), size));
+  handle->Read(size, &buffer);
+
+  InMemoryInputStream source(buffer);
+  BlockSplitBloomFilter bloom_filter1 =
+      std::move(BlockSplitBloomFilter::Deserialize(&source));
+
+  for (int i = 0; i < 4; i++) {
+    const ByteArray tmp(static_cast<uint32_t>(test_string[i].length()),
+                        reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
+    EXPECT_TRUE(bloom_filter1.FindHash(bloom_filter1.Hash(&tmp)));
+  }
+
+  // The following is used to check whether the new created Bloom filter in parquet-cpp is
+  // byte-for-byte identical to file at bloom_data_path which is created from parquet-mr
+  // with same inserted hashes.
+  BlockSplitBloomFilter bloom_filter2;
+  bloom_filter2.Init(bloom_filter1.GetBitsetSize());
+  for (int i = 0; i < 4; i++) {
+    const ByteArray byte_array(static_cast<uint32_t>(test_string[i].length()),
+                               reinterpret_cast<const uint8_t*>(test_string[i].c_str()));
+    bloom_filter2.InsertHash(bloom_filter2.Hash(&byte_array));
+  }
+
+  // Serialize Bloom filter to memory output stream
+  InMemoryOutputStream sink;
+  bloom_filter2.WriteTo(&sink);
+  std::shared_ptr<Buffer> buffer1 = sink.GetBuffer();
+
+  handle->Seek(0);
+  handle->GetSize(&size);
+  std::shared_ptr<Buffer> buffer2;
+  handle->Read(size, &buffer2);
+
+  EXPECT_TRUE((*buffer1).Equals(*buffer2));
+}
+
+// OptmialValueTest is used to test whether OptimalNumOfBits returns expected
+// numbers according to formula:
+//     num_of_bits = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8.0))
+// where ndv is the number of distinct values and fpp is the false positive probability.
+// Also it is used to test whether OptimalNumOfBits returns value between
+// [MINIMUM_BLOOM_FILTER_SIZE, MAXIMUM_BLOOM_FILTER_SIZE].
+TEST(OptimalValueTest, TestBloomFilter) {
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(256, 0.01), UINT32_C(4096));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(512, 0.01), UINT32_C(8192));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1024, 0.01), UINT32_C(16384));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(2048, 0.01), UINT32_C(32768));
+
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.01), UINT32_C(2048));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.01), UINT32_C(4096));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.01), UINT32_C(8192));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.01), UINT32_C(16384));
+
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.025), UINT32_C(2048));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.025), UINT32_C(4096));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.025), UINT32_C(8192));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.025), UINT32_C(16384));
+
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(200, 0.05), UINT32_C(2048));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(300, 0.05), UINT32_C(4096));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(700, 0.05), UINT32_C(8192));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(1500, 0.05), UINT32_C(16384));
+
+  // Boundary check
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.01), UINT32_C(256));
+  EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(4, 0.25), UINT32_C(256));
+
+  EXPECT_EQ(
+      BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.01),
+      UINT32_C(1073741824));
+  EXPECT_EQ(
+      BlockSplitBloomFilter::OptimalNumOfBits(std::numeric_limits<uint32_t>::max(), 0.25),
+      UINT32_C(1073741824));
+}
+
+}  // namespace test
+
+}  // namespace parquet
diff --git a/src/parquet/bloom_filter.cc b/src/parquet/bloom_filter.cc
new file mode 100644
index 0000000..faa344c
--- /dev/null
+++ b/src/parquet/bloom_filter.cc
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cmath>
+#include <cstdint>
+
+#include "arrow/status.h"
+#include "arrow/util/bit-util.h"
+#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
+#include "parquet/murmur3.h"
+#include "parquet/types.h"
+#include "parquet/util/logging.h"
+
+namespace parquet {
+constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock];
+
+BlockSplitBloomFilter::BlockSplitBloomFilter()
+    : pool_(::arrow::default_memory_pool()),
+      hash_strategy_(HashStrategy::MURMUR3_X64_128),
+      algorithm_(Algorithm::BLOCK) {}
+
+void BlockSplitBloomFilter::Init(uint32_t num_bytes) {
+  if (num_bytes < kMinimumBloomFilterBytes) {
+    num_bytes = kMinimumBloomFilterBytes;
+  }
+
+  // Get next power of 2 if it is not power of 2.
+  if ((num_bytes & (num_bytes - 1)) != 0) {
+    num_bytes = static_cast<uint32_t>(::arrow::BitUtil::NextPower2(num_bytes));
+  }
+
+  if (num_bytes > kMaximumBloomFilterBytes) {
+    num_bytes = kMaximumBloomFilterBytes;
+  }
+
+  num_bytes_ = num_bytes;
+  PARQUET_THROW_NOT_OK(::arrow::AllocateBuffer(pool_, num_bytes_, &data_));
+  memset(data_->mutable_data(), 0, num_bytes_);
+
+  this->hasher_.reset(new MurmurHash3());
+}
+
+void BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
+  DCHECK(bitset != nullptr);
+
+  if (num_bytes < kMinimumBloomFilterBytes || num_bytes > kMaximumBloomFilterBytes ||
+      (num_bytes & (num_bytes - 1)) != 0) {
+    throw ParquetException("Given length of bitset is illegal");
+  }
+
+  num_bytes_ = num_bytes;
+  PARQUET_THROW_NOT_OK(::arrow::AllocateBuffer(pool_, num_bytes_, &data_));
+  memcpy(data_->mutable_data(), bitset, num_bytes_);
+
+  this->hasher_.reset(new MurmurHash3());
+}
+
+BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(InputStream* input) {
+  int64_t bytes_available;
+
+  const uint8_t* read_buffer = NULL;
+  read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
+  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
+    throw ParquetException("Failed to deserialize from input stream");
+  }
+  uint32_t len;
+  memcpy(&len, read_buffer, sizeof(uint32_t));
+
+  read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
+  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
+    throw ParquetException("Failed to deserialize from input stream");
+  }
+  uint32_t hash;
+  memcpy(&hash, read_buffer, sizeof(uint32_t));
+  if (static_cast<HashStrategy>(hash) != HashStrategy::MURMUR3_X64_128) {
+    throw ParquetException("Unsupported hash strategy");
+  }
+
+  read_buffer = input->Read(sizeof(uint32_t), &bytes_available);
+  if (static_cast<uint32_t>(bytes_available) != sizeof(uint32_t) || !read_buffer) {
+    throw ParquetException("Failed to deserialize from input stream");
+  }
+  uint32_t algorithm;
+  memcpy(&algorithm, read_buffer, sizeof(uint32_t));
+  if (static_cast<Algorithm>(algorithm) != BloomFilter::Algorithm::BLOCK) {
+    throw ParquetException("Unsupported Bloom filter algorithm");
+  }
+
+  BlockSplitBloomFilter bloom_filter;
+  bloom_filter.Init(input->Read(len, &bytes_available), len);
+  return bloom_filter;
+}
+
+void BlockSplitBloomFilter::WriteTo(OutputStream* sink) const {
+  DCHECK(sink != nullptr);
+
+  sink->Write(reinterpret_cast<const uint8_t*>(&num_bytes_), sizeof(num_bytes_));
+  sink->Write(reinterpret_cast<const uint8_t*>(&hash_strategy_), sizeof(hash_strategy_));
+  sink->Write(reinterpret_cast<const uint8_t*>(&algorithm_), sizeof(algorithm_));
+  sink->Write(data_->mutable_data(), num_bytes_);
+}
+
+void BlockSplitBloomFilter::SetMask(uint32_t key, BlockMask& block_mask) const {
+  for (int i = 0; i < kBitsSetPerBlock; ++i) {
+    block_mask.item[i] = key * SALT[i];
+  }
+
+  for (int i = 0; i < kBitsSetPerBlock; ++i) {
+    block_mask.item[i] = block_mask.item[i] >> 27;
+  }
+
+  for (int i = 0; i < kBitsSetPerBlock; ++i) {
+    block_mask.item[i] = UINT32_C(0x1) << block_mask.item[i];
+  }
+}
+
+bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
+  const uint32_t bucket_index =
+      static_cast<uint32_t>((hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1));
+  uint32_t key = static_cast<uint32_t>(hash);
+  uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
+
+  // Calculate mask for bucket.
+  BlockMask block_mask;
+  SetMask(key, block_mask);
+
+  for (int i = 0; i < kBitsSetPerBlock; ++i) {
+    if (0 == (bitset32[kBitsSetPerBlock * bucket_index + i] & block_mask.item[i])) {
+      return false;
+    }
+  }
+  return true;
+}
+
+void BlockSplitBloomFilter::InsertHash(uint64_t hash) {
+  const uint32_t bucket_index =
+      static_cast<uint32_t>(hash >> 32) & (num_bytes_ / kBytesPerFilterBlock - 1);
+  uint32_t key = static_cast<uint32_t>(hash);
+  uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
+
+  // Calculate mask for bucket.
+  BlockMask block_mask;
+  SetMask(key, block_mask);
+
+  for (int i = 0; i < kBitsSetPerBlock; i++) {
+    bitset32[bucket_index * kBitsSetPerBlock + i] |= block_mask.item[i];
+  }
+}
+
+}  // namespace parquet
diff --git a/src/parquet/bloom_filter.h b/src/parquet/bloom_filter.h
new file mode 100644
index 0000000..e39370a
--- /dev/null
+++ b/src/parquet/bloom_filter.h
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_BLOOM_FILTER_H
+#define PARQUET_BLOOM_FILTER_H
+
+#include <cstdint>
+
+#include "parquet/exception.h"
+#include "parquet/hasher.h"
+#include "parquet/types.h"
+#include "parquet/util/logging.h"
+#include "parquet/util/memory.h"
+
+namespace parquet {
+class OutputStream;
+
+// A Bloom filter is a compact structure to indicate whether an item is not in a set or
+// probably in a set. The Bloom filter usually consists of a bit set that represents a
+// set of elements, a hash strategy and a Bloom filter algorithm.
+class BloomFilter {
+ public:
+  // Maximum Bloom filter size, it sets to HDFS default block size 128MB
+  // This value will be reconsidered when implementing Bloom filter producer.
+  static constexpr uint32_t kMaximumBloomFilterBytes = 128 * 1024 * 1024;
+
+  /// Determine whether an element exist in set or not.
+  ///
+  /// @param hash the element to contain.
+  /// @return false if value is definitely not in set, and true means PROBABLY
+  /// in set.
+  virtual bool FindHash(uint64_t hash) const = 0;
+
+  /// Insert element to set represented by Bloom filter bitset.
+  /// @param hash the hash of value to insert into Bloom filter.
+  virtual void InsertHash(uint64_t hash) = 0;
+
+  /// Write this Bloom filter to an output stream. A Bloom filter structure should
+  /// include bitset length, hash strategy, algorithm, and bitset.
+  ///
+  /// @param sink the output stream to write
+  virtual void WriteTo(OutputStream* sink) const = 0;
+
+  /// Get the number of bytes of bitset
+  virtual uint32_t GetBitsetSize() const = 0;
+
+  /// Compute hash for 32 bits value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(int32_t value) const = 0;
+
+  /// Compute hash for 64 bits value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(int64_t value) const = 0;
+
+  /// Compute hash for float value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(float value) const = 0;
+
+  /// Compute hash for double value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(double value) const = 0;
+
+  /// Compute hash for Int96 value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const Int96* value) const = 0;
+
+  /// Compute hash for ByteArray value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const ByteArray* value) const = 0;
+
+  /// Compute hash for fixed byte array value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;
+
+  virtual ~BloomFilter() {}
+
+ protected:
+  // Hash strategy available for Bloom filter.
+  enum class HashStrategy : uint32_t { MURMUR3_X64_128 = 0 };
+
+  // Bloom filter algorithm.
+  enum class Algorithm : uint32_t { BLOCK = 0 };
+};
+
+// The BlockSplitBloomFilter is implemented using block-based Bloom filters from
+// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic idea is to
+// hash the item to a tiny Bloom filter which size fit a single cache line or smaller.
+//
+// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom
+// filter is 32 bytes to take advantage of 32-byte SIMD instructions.
+class BlockSplitBloomFilter : public BloomFilter {
+ public:
+  /// The constructor of BlockSplitBloomFilter. It uses murmur3_x64_128 as hash function.
+  BlockSplitBloomFilter();
+
+  /// Initialize the BlockSplitBloomFilter. The range of num_bytes should be within
+  /// [kMinimumBloomFilterBytes, kMaximumBloomFilterBytes], it will be
+  /// rounded up/down to lower/upper bound if num_bytes is out of range and also
+  /// will be rounded up to a power of 2.
+  ///
+  /// @param num_bytes The number of bytes to store Bloom filter bitset.
+  void Init(uint32_t num_bytes);
+
+  /// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying
+  /// bitset because the given bitset may not satisfy the 32-byte alignment requirement
+  /// which may lead to segfault when performing SIMD instructions. It is the caller's
+  /// responsibility to free the bitset passed in. This is used when reconstructing
+  /// a Bloom filter from a parquet file.
+  ///
+  /// @param bitset The given bitset to initialize the Bloom filter.
+  /// @param num_bytes  The number of bytes of given bitset.
+  void Init(const uint8_t* bitset, uint32_t num_bytes);
+
+  // Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter.
+  static constexpr uint32_t kMinimumBloomFilterBytes = 32;
+
+  /// Calculate optimal size according to the number of distinct values and false
+  /// positive probability.
+  ///
+  /// @param ndv The number of distinct values.
+  /// @param fpp The false positive probability.
+  /// @return it always return a value between kMinimumBloomFilterBytes and
+  /// kMaximumBloomFilterBytes, and the return value is always a power of 2
+  static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) {
+    DCHECK(fpp > 0.0 && fpp < 1.0);
+    const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8));
+    uint32_t num_bits = static_cast<uint32_t>(m);
+
+    // Handle overflow.
+    if (m < 0 || m > kMaximumBloomFilterBytes << 3) {
+      num_bits = static_cast<uint32_t>(kMaximumBloomFilterBytes << 3);
+    }
+
+    // Round up to lower bound
+    if (num_bits < kMinimumBloomFilterBytes << 3) {
+      num_bits = kMinimumBloomFilterBytes << 3;
+    }
+
+    // Get next power of 2 if bits is not power of 2.
+    if ((num_bits & (num_bits - 1)) != 0) {
+      num_bits = static_cast<uint32_t>(::arrow::BitUtil::NextPower2(num_bits));
+    }
+
+    // Round down to upper bound
+    if (num_bits > kMaximumBloomFilterBytes << 3) {
+      num_bits = kMaximumBloomFilterBytes << 3;
+    }
+
+    return num_bits;
+  }
+
+  bool FindHash(uint64_t hash) const override;
+  void InsertHash(uint64_t hash) override;
+  void WriteTo(OutputStream* sink) const override;
+  uint32_t GetBitsetSize() const override { return num_bytes_; }
+  uint64_t Hash(int64_t value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(float value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(double value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(const Int96* value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(const ByteArray* value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(int32_t value) const override { return hasher_->Hash(value); }
+  uint64_t Hash(const FLBA* value, uint32_t len) const override {
+    return hasher_->Hash(value, len);
+  }
+  /// Deserialize the Bloom filter from an input stream. It is used when reconstructing
+  /// a Bloom filter from a parquet filter.
+  ///
+  /// @param input_stream The input stream from which to construct the Bloom filter
+  /// @return The BlockSplitBloomFilter.
+  static BlockSplitBloomFilter Deserialize(InputStream* input_stream);
+
+ private:
+  // Bytes in a tiny Bloom filter block.
+  static constexpr int kBytesPerFilterBlock = 32;
+
+  // The number of bits to be set in each tiny Bloom filter
+  static constexpr int kBitsSetPerBlock = 8;
+
+  // A mask structure used to set bits in each tiny Bloom filter.
+  struct BlockMask {
+    uint32_t item[kBitsSetPerBlock];
+  };
+
+  // The block-based algorithm needs eight odd SALT values to calculate eight indexes
+  // of bit to set, one bit in each 32-bit word.
+  static constexpr uint32_t SALT[kBitsSetPerBlock] = {
+      0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
+      0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U};
+
+  /// Set bits in mask array according to input key.
+  /// @param key the value to calculate mask values.
+  /// @param mask the mask array is used to set inside a block
+  void SetMask(uint32_t key, BlockMask& mask) const;
+
+  // Memory pool to allocate aligned buffer for bitset
+  ::arrow::MemoryPool* pool_;
+
+  // The underlying buffer of bitset.
+  std::shared_ptr<Buffer> data_;
+
+  // The number of bytes of Bloom filter bitset.
+  uint32_t num_bytes_;
+
+  // Hash strategy used in this Bloom filter.
+  HashStrategy hash_strategy_;
+
+  // Algorithm used in this Bloom filter.
+  Algorithm algorithm_;
+
+  // The hash pointer points to actual hash class used.
+  std::unique_ptr<Hasher> hasher_;
+};
+
+}  // namespace parquet
+
+#endif  // PARQUET_BLOOM_FILTER_H
diff --git a/src/parquet/hasher.h b/src/parquet/hasher.h
new file mode 100644
index 0000000..dc316a0
--- /dev/null
+++ b/src/parquet/hasher.h
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_HASHER_H
+#define PARQUET_HASHER_H
+
+#include <cstdint>
+#include "parquet/types.h"
+
+namespace parquet {
+// Abstract class for hash
+class Hasher {
+ public:
+  /// Compute hash for 32 bits value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(int32_t value) const = 0;
+
+  /// Compute hash for 64 bits value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(int64_t value) const = 0;
+
+  /// Compute hash for float value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(float value) const = 0;
+
+  /// Compute hash for double value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(double value) const = 0;
+
+  /// Compute hash for Int96 value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const Int96* value) const = 0;
+
+  /// Compute hash for ByteArray value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const ByteArray* value) const = 0;
+
+  /// Compute hash for fixed byte array value by using its plain encoding result.
+  ///
+  /// @param value the value to hash.
+  /// @return hash result.
+  virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;
+
+  virtual ~Hasher() = default;
+};
+
+}  // namespace parquet
+
+#endif  // PARQUET_HASHER_H
diff --git a/src/parquet/murmur3.cc b/src/parquet/murmur3.cc
new file mode 100644
index 0000000..a19436e
--- /dev/null
+++ b/src/parquet/murmur3.cc
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//-----------------------------------------------------------------------------
+// MurmurHash3 was written by Austin Appleby, and is placed in the public
+// domain. The author hereby disclaims copyright to this source code.
+
+// Note - The x86 and x64 versions do _not_ produce the same results, as the
+// algorithms are optimized for their respective platforms. You can still
+// compile and run any of them on any platform, but your performance with the
+// non-native version will be less than optimal.
+
+#include "parquet/murmur3.h"
+
+namespace parquet {
+
+#if defined(_MSC_VER)
+
+#define FORCE_INLINE __forceinline
+#define ROTL64(x, y) _rotl64(x, y)
+
+#else  // defined(_MSC_VER)
+
+#define FORCE_INLINE inline __attribute__((always_inline))
+inline uint64_t rotl64(uint64_t x, int8_t r) { return (x << r) | (x >> (64 - r)); }
+#define ROTL64(x, y) rotl64(x, y)
+
+#endif  // !defined(_MSC_VER)
+
+#define BIG_CONSTANT(x) (x##LLU)
+
+//-----------------------------------------------------------------------------
+// Block read - if your platform needs to do endian-swapping or can only
+// handle aligned reads, do the conversion here
+
+FORCE_INLINE uint32_t getblock32(const uint32_t* p, int i) { return p[i]; }
+
+FORCE_INLINE uint64_t getblock64(const uint64_t* p, int i) { return p[i]; }
+
+//-----------------------------------------------------------------------------
+// Finalization mix - force all bits of a hash block to avalanche
+
+FORCE_INLINE uint32_t fmix32(uint32_t h) {
+  h ^= h >> 16;
+  h *= 0x85ebca6b;
+  h ^= h >> 13;
+  h *= 0xc2b2ae35;
+  h ^= h >> 16;
+
+  return h;
+}
+
+//----------
+
+FORCE_INLINE uint64_t fmix64(uint64_t k) {
+  k ^= k >> 33;
+  k *= BIG_CONSTANT(0xff51afd7ed558ccd);
+  k ^= k >> 33;
+  k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53);
+  k ^= k >> 33;
+
+  return k;
+}
+
+//-----------------------------------------------------------------------------
+
+void Hash_x64_128(const void* key, const int len, const uint32_t seed, uint64_t out[2]) {
+  const uint8_t* data = (const uint8_t*)key;
+  const int nblocks = len / 16;
+
+  uint64_t h1 = seed;
+  uint64_t h2 = seed;
+
+  const uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5);
+  const uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f);
+
+  //----------
+  // body
+
+  const uint64_t* blocks = (const uint64_t*)(data);
+
+  for (int i = 0; i < nblocks; i++) {
+    uint64_t k1 = getblock64(blocks, i * 2 + 0);
+    uint64_t k2 = getblock64(blocks, i * 2 + 1);
+
+    k1 *= c1;
+    k1 = ROTL64(k1, 31);
+    k1 *= c2;
+    h1 ^= k1;
+
+    h1 = ROTL64(h1, 27);
+    h1 += h2;
+    h1 = h1 * 5 + 0x52dce729;
+
+    k2 *= c2;
+    k2 = ROTL64(k2, 33);
+    k2 *= c1;
+    h2 ^= k2;
+
+    h2 = ROTL64(h2, 31);
+    h2 += h1;
+    h2 = h2 * 5 + 0x38495ab5;
+  }
+
+  //----------
+  // tail
+
+  const uint8_t* tail = (const uint8_t*)(data + nblocks * 16);
+
+  uint64_t k1 = 0;
+  uint64_t k2 = 0;
+
+  switch (len & 15) {
+    case 15:
+      k2 ^= ((uint64_t)tail[14]) << 48;
+    case 14:
+      k2 ^= ((uint64_t)tail[13]) << 40;
+    case 13:
+      k2 ^= ((uint64_t)tail[12]) << 32;
+    case 12:
+      k2 ^= ((uint64_t)tail[11]) << 24;
+    case 11:
+      k2 ^= ((uint64_t)tail[10]) << 16;
+    case 10:
+      k2 ^= ((uint64_t)tail[9]) << 8;
+    case 9:
+      k2 ^= ((uint64_t)tail[8]) << 0;
+      k2 *= c2;
+      k2 = ROTL64(k2, 33);
+      k2 *= c1;
+      h2 ^= k2;
+
+    case 8:
+      k1 ^= ((uint64_t)tail[7]) << 56;
+    case 7:
+      k1 ^= ((uint64_t)tail[6]) << 48;
+    case 6:
+      k1 ^= ((uint64_t)tail[5]) << 40;
+    case 5:
+      k1 ^= ((uint64_t)tail[4]) << 32;
+    case 4:
+      k1 ^= ((uint64_t)tail[3]) << 24;
+    case 3:
+      k1 ^= ((uint64_t)tail[2]) << 16;
+    case 2:
+      k1 ^= ((uint64_t)tail[1]) << 8;
+    case 1:
+      k1 ^= ((uint64_t)tail[0]) << 0;
+      k1 *= c1;
+      k1 = ROTL64(k1, 31);
+      k1 *= c2;
+      h1 ^= k1;
+  }
+
+  //----------
+  // finalization
+
+  h1 ^= len;
+  h2 ^= len;
+
+  h1 += h2;
+  h2 += h1;
+
+  h1 = fmix64(h1);
+  h2 = fmix64(h2);
+
+  h1 += h2;
+  h2 += h1;
+
+  reinterpret_cast<uint64_t*>(out)[0] = h1;
+  reinterpret_cast<uint64_t*>(out)[1] = h2;
+}
+
+template <typename T>
+uint64_t HashHelper(T value, uint32_t seed) {
+  uint64_t output[2];
+  Hash_x64_128(reinterpret_cast<void*>(&value), sizeof(T), seed, output);
+  return output[0];
+}
+
+uint64_t MurmurHash3::Hash(int32_t value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(int64_t value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(float value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(double value) const { return HashHelper(value, seed_); }
+
+uint64_t MurmurHash3::Hash(const FLBA* value, uint32_t len) const {
+  uint64_t out[2];
+  Hash_x64_128(reinterpret_cast<const void*>(value->ptr), len, seed_, out);
+  return out[0];
+}
+
+uint64_t MurmurHash3::Hash(const Int96* value) const {
+  uint64_t out[2];
+  Hash_x64_128(reinterpret_cast<const void*>(value->value), sizeof(value->value), seed_,
+               out);
+  return out[0];
+}
+
+uint64_t MurmurHash3::Hash(const ByteArray* value) const {
+  uint64_t out[2];
+  Hash_x64_128(reinterpret_cast<const void*>(value->ptr), value->len, seed_, out);
+  return out[0];
+}
+
+}  // namespace parquet
diff --git a/src/parquet/murmur3.h b/src/parquet/murmur3.h
new file mode 100644
index 0000000..84792f3
--- /dev/null
+++ b/src/parquet/murmur3.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//-----------------------------------------------------------------------------
+// MurmurHash3 was written by Austin Appleby, and is placed in the public
+// domain. The author hereby disclaims copyright to this source code.
+
+#ifndef PARQUET_MURMURHASH3_H_
+#define PARQUET_MURMURHASH3_H_
+
+#include <cstdint>
+
+#include "parquet/hasher.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+/// Source:
+/// https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp
+/// (Modified to adapt to coding conventions and to inherit the Hasher abstract class)
+class MurmurHash3 : public Hasher {
+ public:
+  MurmurHash3() : seed_(DEFAULT_SEED) {}
+  uint64_t Hash(int32_t value) const override;
+  uint64_t Hash(int64_t value) const override;
+  uint64_t Hash(float value) const override;
+  uint64_t Hash(double value) const override;
+  uint64_t Hash(const Int96* value) const override;
+  uint64_t Hash(const ByteArray* value) const override;
+  uint64_t Hash(const FLBA* val, uint32_t len) const override;
+
+ private:
+  // Default seed for hash which comes from Bloom filter in parquet-mr, it is generated
+  // by System.nanoTime() of java.
+  static constexpr int DEFAULT_SEED = 1361930890;
+
+  uint32_t seed_;
+};
+
+}  // namespace parquet
+
+#endif  // PARQUET_MURMURHASH3_H_