Merge remote-tracking branch 'apache/master' into bloom-filter

Conflicts:
	parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
	parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
	pom.xml
diff --git a/dev/travis-before_install-bloom-filter.sh b/dev/travis-before_install-bloom-filter.sh
new file mode 100644
index 0000000..84d996a
--- /dev/null
+++ b/dev/travis-before_install-bloom-filter.sh
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+################################################################################
+# This is a branch-specific script that gets invoked at the end of
+# travis-before_install.sh. It is run for the bloom-filter branch only.
+################################################################################
+
+git clone https://github.com/apache/parquet-format.git
+cd parquet-format
+mvn install -DskipTests -q
+cd ..
+rm -rf parquet-format
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index b2369b7..322fa28 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -58,7 +58,11 @@
       <artifactId>fastutil</artifactId>
       <version>${fastutil.version}</version>
     </dependency>
-
+    <dependency>
+      <groupId>net.openhft</groupId>
+      <artifactId>zero-allocation-hashing</artifactId>
+      <version>0.9</version>
+    </dependency>
     <dependency>
       <groupId>com.carrotsearch</groupId>
       <artifactId>junit-benchmarks</artifactId>
@@ -87,7 +91,6 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
       <version>${guava.version}</version>
-      <scope>test</scope>
     </dependency>
   </dependencies>
 
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index c022b72..ad09223 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -29,12 +29,18 @@
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
 import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import org.apache.parquet.column.values.factory.ValuesWriterFactory;
 import org.apache.parquet.schema.MessageType;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * This class represents all the configurable Parquet properties.
  */
@@ -50,6 +56,7 @@
   public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
   public static final int DEFAULT_STATISTICS_TRUNCATE_LENGTH = Integer.MAX_VALUE;
   public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
+  public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
 
   public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
 
@@ -90,13 +97,20 @@
   private final ValuesWriterFactory valuesWriterFactory;
   private final int columnIndexTruncateLength;
   private final int statisticsTruncateLength;
+
+  // The key-value pair represents the column name and its expected distinct number of values in a row group.
+  private final Map<String, Long> bloomFilterExpectedDistinctNumbers;
+  private final int maxBloomFilterBytes;
+  private final Set<String> bloomFilterColumns;
   private final int pageRowCountLimit;
   private final boolean pageWriteChecksumEnabled;
 
   private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
                             int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
                             ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
-                            boolean pageWriteChecksumEnabled, int statisticsTruncateLength) {
+                            boolean pageWriteChecksumEnabled, int statisticsTruncateLength,
+                            Map<String, Long> bloomFilterExpectedDistinctNumber, Set<String> bloomFilterColumns,
+                            int maxBloomFilterBytes) {
     this.pageSizeThreshold = pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
       .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -111,6 +125,9 @@
     this.valuesWriterFactory = writerFactory;
     this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
     this.statisticsTruncateLength = statisticsTruncateLength;
+    this.bloomFilterExpectedDistinctNumbers = bloomFilterExpectedDistinctNumber;
+    this.bloomFilterColumns = bloomFilterColumns;
+    this.maxBloomFilterBytes = maxBloomFilterBytes;
     this.pageRowCountLimit = pageRowCountLimit;
     this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
   }
@@ -176,10 +193,23 @@
   public ColumnWriteStore newColumnWriteStore(MessageType schema,
                                               PageWriteStore pageStore) {
     switch (writerVersion) {
+      case PARQUET_1_0:
+        return new ColumnWriteStoreV1(schema, pageStore, this);
+      case PARQUET_2_0:
+        return new ColumnWriteStoreV2(schema, pageStore, this);
+      default:
+        throw new IllegalArgumentException("unknown version " + writerVersion);
+    }
+  }
+
+  public ColumnWriteStore newColumnWriteStore(MessageType schema,
+                                              PageWriteStore pageStore,
+                                              BloomFilterWriteStore bloomFilterWriteStore) {
+    switch (writerVersion) {
     case PARQUET_1_0:
-      return new ColumnWriteStoreV1(schema, pageStore, this);
+      return new ColumnWriteStoreV1(schema, pageStore, bloomFilterWriteStore, this);
     case PARQUET_2_0:
-      return new ColumnWriteStoreV2(schema, pageStore, this);
+      return new ColumnWriteStoreV2(schema, pageStore, bloomFilterWriteStore, this);
     default:
       throw new IllegalArgumentException("unknown version " + writerVersion);
     }
@@ -217,6 +247,18 @@
     return pageWriteChecksumEnabled;
   }
 
+  public Map<String, Long> getBloomFilterColumnExpectedNDVs() {
+    return bloomFilterExpectedDistinctNumbers;
+  }
+
+  public Set<String> getBloomFilterColumns() {
+    return bloomFilterColumns;
+  }
+
+  public int getMaxBloomFilterBytes() {
+    return maxBloomFilterBytes;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -237,6 +279,9 @@
     private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
     private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
     private int statisticsTruncateLength = DEFAULT_STATISTICS_TRUNCATE_LENGTH;
+    private Map<String, Long> bloomFilterColumnExpectedNDVs = new HashMap<>();
+    private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
+    private Set<String> bloomFilterColumns = new HashSet<>();
     private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
     private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
 
@@ -255,6 +300,9 @@
       this.allocator = toCopy.allocator;
       this.pageRowCountLimit = toCopy.pageRowCountLimit;
       this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
+      this.bloomFilterColumnExpectedNDVs = toCopy.bloomFilterExpectedDistinctNumbers;
+      this.bloomFilterColumns = toCopy.bloomFilterColumns;
+      this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
     }
 
     /**
@@ -349,6 +397,39 @@
       return this;
     }
 
+    /**
+     * Set max Bloom filter bytes for related columns.
+     *
+     * @param maxBloomFilterBytes the max bytes of a Bloom filter bitset for a column.
+     * @return this builder for method chaining
+     */
+    public Builder withMaxBloomFilterBytes(int maxBloomFilterBytes) {
+      this.maxBloomFilterBytes = maxBloomFilterBytes;
+      return this;
+    }
+
+    /**
+     * Set Bloom filter column names.
+     *
+     * @param columns the columns which has bloom filter enabled.
+     * @return this builder for method chaining
+     */
+    public Builder withBloomFilterColumnNames(Set<String> columns) {
+      this.bloomFilterColumns = columns;
+      return this;
+    }
+
+    /**
+     * Set expected columns distinct number for Bloom filter.
+     *
+     * @param columnExpectedNDVs the columns expected number of distinct values in a row group
+     * @return this builder for method chaining
+     */
+    public Builder withBloomFilterColumnNdvs(Map<String, Long> columnExpectedNDVs) {
+      this.bloomFilterColumnExpectedNDVs = columnExpectedNDVs;
+      return this;
+    }
+
     public Builder withPageRowCountLimit(int rowCount) {
       Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
       pageRowCountLimit = rowCount;
@@ -365,7 +446,8 @@
         new ParquetProperties(writerVersion, pageSize, dictPageSize,
           enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
           estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
-          pageRowCountLimit, pageWriteChecksumEnabled, statisticsTruncateLength);
+          pageRowCountLimit, pageWriteChecksumEnabled, statisticsTruncateLength,
+          bloomFilterColumnExpectedNDVs, bloomFilterColumns, maxBloomFilterBytes);
       // we pass a constructed but uninitialized factory to ParquetProperties above as currently
       // creation of ValuesWriters is invoked from within ParquetProperties. In the future
       // we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index 2670c31..5a8fe38 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -34,6 +34,8 @@
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.schema.MessageType;
 
 /**
@@ -74,7 +76,7 @@
       public ColumnWriter getColumnWriter(ColumnDescriptor path) {
         ColumnWriterBase column = columns.get(path);
         if (column == null) {
-          column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props);
+          column = createColumnWriter(path, pageWriteStore.getPageWriter(path), null, props);
           columns.put(path, column);
         }
         return column;
@@ -91,7 +93,7 @@
     Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
     for (ColumnDescriptor path : schema.getColumns()) {
       PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-      mcolumns.put(path, createColumnWriter(path, pageWriter, props));
+      mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
     }
     this.columns = unmodifiableMap(mcolumns);
 
@@ -105,7 +107,38 @@
     };
   }
 
-  abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props);
+  // The Bloom filter is written to a specified bitset instead of pages, so it needs a separate write store abstract.
+  ColumnWriteStoreBase(
+    MessageType schema,
+    PageWriteStore pageWriteStore,
+    BloomFilterWriteStore bloomFilterWriteStore,
+    ParquetProperties props) {
+    this.props = props;
+    this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
+    Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
+    for (ColumnDescriptor path : schema.getColumns()) {
+      PageWriter pageWriter = pageWriteStore.getPageWriter(path);
+      if (props.getBloomFilterColumnExpectedNDVs() != null) {
+        BloomFilterWriter bloomFilterWriter = bloomFilterWriteStore.getBloomFilterWriter(path);
+        mcolumns.put(path, createColumnWriter(path, pageWriter, bloomFilterWriter, props));
+      } else {
+        mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
+      }
+    }
+    this.columns = unmodifiableMap(mcolumns);
+
+    this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
+
+    columnWriterProvider = new ColumnWriterProvider() {
+      @Override
+      public ColumnWriter getColumnWriter(ColumnDescriptor path) {
+        return columns.get(path);
+      }
+    };
+  }
+
+  abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+                                               BloomFilterWriter bloomFilterWriter, ParquetProperties props);
 
   public ColumnWriter getColumnWriter(ColumnDescriptor path) {
     return columnWriterProvider.getColumnWriter(path);
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
index 7258423..4f4a971 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
@@ -22,10 +22,11 @@
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.schema.MessageType;
 
 public class ColumnWriteStoreV1 extends ColumnWriteStoreBase {
-
   public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore, ParquetProperties props) {
     super(schema, pageWriteStore, props);
   }
@@ -36,8 +37,15 @@
     super(pageWriteStore, props);
   }
 
+  public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore,
+                            BloomFilterWriteStore bloomFilterWriteStore,
+                            ParquetProperties props) {
+    super (schema, pageWriteStore, bloomFilterWriteStore, props);
+  }
+
   @Override
-  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
-    return new ColumnWriterV1(path, pageWriter, props);
+  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+                                      BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+    return new ColumnWriterV1(path, pageWriter, bloomFilterWriter, props);
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
index bf1090d..590c3ed 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -22,6 +22,8 @@
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.schema.MessageType;
 
 public class ColumnWriteStoreV2 extends ColumnWriteStoreBase {
@@ -30,8 +32,15 @@
     super(schema, pageWriteStore, props);
   }
 
+  public ColumnWriteStoreV2(MessageType schema, PageWriteStore pageWriteStore,
+                            BloomFilterWriteStore bloomFilterWriteStore,
+                            ParquetProperties props) {
+    super(schema, pageWriteStore, bloomFilterWriteStore, props);
+  }
+
   @Override
-  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
-    return new ColumnWriterV2(path, pageWriter, props);
+  ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter,
+                                      BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+    return new ColumnWriterV2(path, pageWriter, bloomFilterWriter, props);
   }
 }
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 8fc7d31..b3b799a 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -19,6 +19,8 @@
 package org.apache.parquet.column.impl;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriter;
@@ -27,6 +29,9 @@
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.io.api.Binary;
 import org.slf4j.Logger;
@@ -53,6 +58,9 @@
   private long rowsWrittenSoFar = 0;
   private int pageRowCount;
 
+  private BloomFilterWriter bloomFilterWriter;
+  private BloomFilter bloomFilter;
+
   ColumnWriterBase(
       ColumnDescriptor path,
       PageWriter pageWriter,
@@ -66,6 +74,42 @@
     this.dataColumn = props.newValuesWriter(path);
   }
 
+  ColumnWriterBase(
+    ColumnDescriptor path,
+    PageWriter pageWriter,
+    BloomFilterWriter bloomFilterWriter,
+    ParquetProperties props
+  ) {
+    this(path, pageWriter, props);
+
+    // Bloom filters don't support nested columns yet; see PARQUET-1453.
+    if (path.getPath().length != 1 || bloomFilterWriter == null) {
+      return;
+    }
+    String column = path.getPath()[0];
+
+    this.bloomFilterWriter = bloomFilterWriter;
+    Set<String> bloomFilterColumns = props.getBloomFilterColumns();
+    if (!bloomFilterColumns.contains(column)) {
+      return;
+    }
+    int maxBloomFilterSize = props.getMaxBloomFilterBytes();
+
+    Map<String, Long> bloomFilterColumnExpectedNDVs = props.getBloomFilterColumnExpectedNDVs();
+    if (bloomFilterColumnExpectedNDVs.size() > 0) {
+      // If user specify the column NDV, we construct Bloom filter from it.
+      if (bloomFilterColumnExpectedNDVs.keySet().contains(column)) {
+        int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(bloomFilterColumnExpectedNDVs.get(column).intValue(),
+          BlockSplitBloomFilter.DEFAULT_FPP);
+
+        this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize);
+      }
+    }
+    else {
+      this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize);
+    }
+  }
+
   abstract ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path);
 
   abstract ValuesWriter createDLWriter(ParquetProperties props, ColumnDescriptor path);
@@ -122,6 +166,36 @@
         + pageWriter.getMemSize();
   }
 
+  private void updateBloomFilter(int value) {
+    if (bloomFilter != null) {
+      bloomFilter.insertHash(bloomFilter.hash(value));
+    }
+  }
+
+  private void updateBloomFilter(long value) {
+    if (bloomFilter != null) {
+      bloomFilter.insertHash(bloomFilter.hash(value));
+    }
+  }
+
+  private void updateBloomFilter(double value) {
+    if (bloomFilter != null) {
+      bloomFilter.insertHash(bloomFilter.hash(value));
+    }
+  }
+
+  private void updateBloomFilter(float value) {
+    if (bloomFilter != null) {
+      bloomFilter.insertHash(bloomFilter.hash(value));
+    }
+  }
+
+  private void updateBloomFilter(Binary value) {
+    if (bloomFilter != null) {
+      bloomFilter.insertHash(bloomFilter.hash(value));
+    }
+  }
+
   /**
    * Writes the current value
    *
@@ -137,6 +211,7 @@
     definitionLevel(definitionLevel);
     dataColumn.writeDouble(value);
     statistics.updateStats(value);
+    updateBloomFilter(value);
     ++valueCount;
   }
 
@@ -155,6 +230,7 @@
     definitionLevel(definitionLevel);
     dataColumn.writeFloat(value);
     statistics.updateStats(value);
+    updateBloomFilter(value);
     ++valueCount;
   }
 
@@ -173,6 +249,7 @@
     definitionLevel(definitionLevel);
     dataColumn.writeBytes(value);
     statistics.updateStats(value);
+    updateBloomFilter(value);
     ++valueCount;
   }
 
@@ -209,6 +286,7 @@
     definitionLevel(definitionLevel);
     dataColumn.writeInteger(value);
     statistics.updateStats(value);
+    updateBloomFilter(value);
     ++valueCount;
   }
 
@@ -227,6 +305,7 @@
     definitionLevel(definitionLevel);
     dataColumn.writeLong(value);
     statistics.updateStats(value);
+    updateBloomFilter(value);
     ++valueCount;
   }
 
@@ -246,6 +325,10 @@
       }
       dataColumn.resetDictionary();
     }
+
+    if (bloomFilterWriter != null && bloomFilter != null) {
+      bloomFilterWriter.writeBloomFilter(bloomFilter);
+    }
   }
 
   /**
@@ -265,20 +348,24 @@
    * @return the number of bytes of memory used to buffer the current data and the previously written pages
    */
   long getTotalBufferedSize() {
+    long bloomBufferSize = bloomFilter == null ? 0 : bloomFilter.getBitsetSize();
     return repetitionLevelColumn.getBufferedSize()
         + definitionLevelColumn.getBufferedSize()
         + dataColumn.getBufferedSize()
-        + pageWriter.getMemSize();
+        + pageWriter.getMemSize()
+        + bloomBufferSize;
   }
 
   /**
    * @return actual memory used
    */
   long allocatedSize() {
+    long bloomAllocatedSize = bloomFilter == null ? 0 : bloomFilter.getBitsetSize();
     return repetitionLevelColumn.getAllocatedSize()
         + definitionLevelColumn.getAllocatedSize()
         + dataColumn.getAllocatedSize()
-        + pageWriter.allocatedSize();
+        + pageWriter.allocatedSize()
+        + bloomAllocatedSize;
   }
 
   /**
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
index 646e31a..98c1e19 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
@@ -27,16 +27,21 @@
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 
 /**
  * Writes (repetition level, definition level, value) triplets and deals with writing pages to the underlying layer.
  */
 final class ColumnWriterV1 extends ColumnWriterBase {
-
   ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
     super(path, pageWriter, props);
   }
 
+  public ColumnWriterV1(ColumnDescriptor path, PageWriter pageWriter,
+                        BloomFilterWriter bloomFilterWriter, ParquetProperties props) {
+    super(path, pageWriter, bloomFilterWriter, props);
+  }
+
   @Override
   ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
     return props.newRepetitionLevelWriter(path);
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
index e4e8563..cc44e2d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -28,6 +28,7 @@
 import org.apache.parquet.column.statistics.Statistics;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import org.apache.parquet.io.ParquetEncodingException;
@@ -59,6 +60,11 @@
     super(path, pageWriter, props);
   }
 
+  ColumnWriterV2(ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter bloomFilterWriter,
+                 ParquetProperties props) {
+    super(path, pageWriter, bloomFilterWriter, props);
+  }
+
   @Override
   ValuesWriter createRLWriter(ParquetProperties props, ColumnDescriptor path) {
     return path.getMaxRepetitionLevel() == 0 ? NULL_WRITER : new RLEWriterForV2(props.newRepetitionLevelEncoder(path));
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
index 0aac63e..aa68cc1 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriteStore.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
new file mode 100644
index 0000000..cc9f674
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.io.api.Binary;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
+
+/*
+ * This Bloom filter is implemented using block-based Bloom filter algorithm from Putze et al.'s
+ * "Cache-, Hash- and Space-Efficient Bloom filters". The basic idea is to hash the item to a tiny
+ * Bloom filter which size fit a single cache line or smaller. This implementation sets 8 bits in
+ * each tiny Bloom filter. Each tiny Bloom filter is 32 bytes to take advantage of 32-byte SIMD
+ * instruction.
+ */
+public class BlockSplitBloomFilter implements BloomFilter {
+  // Bytes in a tiny Bloom filter block.
+  private static final int BYTES_PER_BLOCK = 32;
+
+  // Bits in a tiny Bloom filter block.
+  private static final int BITS_PER_BLOCK = 256;
+
+  // Default minimum Bloom filter size, set to the size of a tiny Bloom filter block
+  public static final int DEFAULT_MINIMUM_BYTES = 32;
+
+  // Default Maximum Bloom filter size, set to 1MB which should cover most cases.
+  public static final int DEFAULT_MAXIMUM_BYTES = 1024 * 1024;
+
+  // The number of bits to set in a tiny Bloom filter
+  private static final int BITS_SET_PER_BLOCK = 8;
+
+  // The metadata in the header of a serialized Bloom filter is four four-byte values: the number of bytes,
+  // the filter algorithm, the hash algorithm, and the compression.
+  public static final int HEADER_SIZE = 16;
+
+  // The default false positive probability value
+  public static final double DEFAULT_FPP = 0.01;
+
+  // Hash strategy used in this Bloom filter.
+  private final HashStrategy hashStrategy;
+
+  // The underlying byte array for Bloom filter bitset.
+  private byte[] bitset;
+
+  // A integer array buffer of underlying bitset to help setting bits.
+  private IntBuffer intBuffer;
+
+  // Hash function use to compute hash for column value.
+  private HashFunction hashFunction;
+
+  private int maximumBytes = DEFAULT_MAXIMUM_BYTES;
+  private int minimumBytes = DEFAULT_MINIMUM_BYTES;
+
+  // The block-based algorithm needs 8 odd SALT values to calculate eight indexes
+  // of bits to set, one per 32-bit word.
+  private static final int[] SALT = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d,
+    0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
+
+  /**
+   * Constructor of block-based Bloom filter.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+   *                 [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down
+   *                 to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
+   *                 of 2. It uses XXH64 as its default hash function.
+   */
+  public BlockSplitBloomFilter(int numBytes) {
+    this(numBytes, DEFAULT_MAXIMUM_BYTES, HashStrategy.XXH64);
+  }
+
+  /**
+   * Constructor of block-based Bloom filter.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+   *                 [DEFAULT_MINIMUM_BYTES, DEFAULT_MAXIMUM_BYTES], it will be rounded up/down
+   *                 to lower/upper bound if num_bytes is out of range. It will also be rounded up to a power
+   *                 of 2. It uses XXH64 as its default hash function.
+   * @param maximumBytes The maximum bytes of the Bloom filter.
+   */
+  public BlockSplitBloomFilter(int numBytes, int maximumBytes) {
+    this(numBytes, maximumBytes, HashStrategy.XXH64);
+  }
+
+  /**
+   * Constructor of block-based Bloom filter.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset
+   * @param hashStrategy The hash strategy of Bloom filter.
+   */
+  private BlockSplitBloomFilter(int numBytes, HashStrategy hashStrategy) {
+    this(numBytes, DEFAULT_MAXIMUM_BYTES, hashStrategy);
+  }
+
+  /**
+   * Constructor of block-based Bloom filter.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+   *                 [DEFAULT_MINIMUM_BYTES, maximumBytes], it will be rounded up/down to lower/upper bound if
+   *                 num_bytes is out of range. It will also be rounded up to a power of 2.
+   * @param maximumBytes The maximum bytes of the Bloom filter.
+   * @param hashStrategy The adopted hash strategy of the Bloom filter.
+   */
+  public BlockSplitBloomFilter(int numBytes, int maximumBytes, HashStrategy hashStrategy) {
+    if (maximumBytes > DEFAULT_MINIMUM_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    initBitset(numBytes);
+
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        hashFunction = new XxHash();
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+  }
+
+
+  /**
+   * Construct the Bloom filter with given bitset, it is used when reconstructing
+   * Bloom filter from parquet file. It use XXH64 as its default hash
+   * function.
+   *
+   * @param bitset The given bitset to construct Bloom filter.
+   */
+  public BlockSplitBloomFilter(byte[] bitset) {
+    this(bitset, HashStrategy.XXH64);
+  }
+
+  /**
+   * Construct the Bloom filter with given bitset, it is used when reconstructing
+   * Bloom filter from parquet file.
+   *
+   * @param bitset The given bitset to construct Bloom filter.
+   * @param hashStrategy The hash strategy Bloom filter apply.
+   */
+  private BlockSplitBloomFilter(byte[] bitset, HashStrategy hashStrategy) {
+    if (bitset == null) {
+      throw new RuntimeException("Given bitset is null");
+    }
+
+    this.bitset = bitset;
+    this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        hashFunction = new XxHash();
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+  }
+
+  /**
+   * Create a new bitset for Bloom filter.
+   *
+   * @param numBytes The number of bytes for Bloom filter bitset. The range of num_bytes should be within
+   *                 [minimumBytes, maximumBytes], it will be rounded up/down
+   *                 to lower/upper bound if num_bytes is out of range and also will rounded up to a power
+   *                 of 2. It uses XXH64 as its default hash function and block-based algorithm
+   *                 as default algorithm.
+   */
+  private void initBitset(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // Get next power of 2 if it is not power of 2.
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes) << 1;
+    }
+    if (numBytes > maximumBytes || numBytes < 0) {
+      numBytes = maximumBytes;
+    }
+    this.bitset = new byte[numBytes];
+    this.intBuffer = ByteBuffer.wrap(bitset).order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    // Write number of bytes of bitset.
+    out.write(BytesUtils.intToBytes(bitset.length));
+    // Write hash strategy
+    out.write(BytesUtils.intToBytes(hashStrategy.value));
+    // Write algorithm
+    out.write(BytesUtils.intToBytes(Algorithm.BLOCK.value));
+    // Write compression
+    out.write(BytesUtils.intToBytes(Compression.UNCOMPRESSED.value));
+    // Write bitset
+    out.write(bitset);
+  }
+
+  private int[] setMask(int key) {
+    int[] mask = new int[BITS_SET_PER_BLOCK];
+
+    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+      mask[i] = key * SALT[i];
+    }
+    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+      mask[i] = mask[i] >>> 27;
+    }
+    for (int i = 0; i < BITS_SET_PER_BLOCK; ++i) {
+      mask[i] = 0x1 << mask[i];
+    }
+
+    return mask;
+  }
+
+  @Override
+  public void insertHash(long hash) {
+    long numBlocks = bitset.length / BYTES_PER_BLOCK;
+    long lowHash = hash >>> 32;
+    int blockIndex = (int)((lowHash * numBlocks) >> 32);
+    int key = (int)hash;
+
+    // Calculate mask for bucket.
+    int[] mask = setMask(key);
+    for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
+      int value = intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i);
+      value |= mask[i];
+      intBuffer.put(blockIndex * (BYTES_PER_BLOCK / 4) + i, value);
+    }
+  }
+
+  @Override
+  public boolean findHash(long hash) {
+    long numBlocks = bitset.length / BYTES_PER_BLOCK;
+    long lowHash = hash >>> 32;
+    int blockIndex = (int)((lowHash * numBlocks) >> 32);
+    int key = (int)hash;
+
+    // Calculate mask for the tiny Bloom filter.
+    int[] mask = setMask(key);
+    for (int i = 0; i < BITS_SET_PER_BLOCK; i++) {
+      if (0 == (intBuffer.get(blockIndex * (BYTES_PER_BLOCK / 4) + i) & mask[i])) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  /**
+   * Calculate optimal size according to the number of distinct values and false positive probability.
+   *
+   * @param n: The number of distinct values.
+   * @param p: The false positive probability.
+   * @return optimal number of bits of given n and p.
+   */
+  public static int optimalNumOfBits(long n, double p) {
+    Preconditions.checkArgument((p > 0.0 && p < 1.0),
+      "FPP should be less than 1.0 and great than 0.0");
+    final double m = -8 * n / Math.log(1 - Math.pow(p, 1.0 / 8));
+    final double MAX = DEFAULT_MAXIMUM_BYTES << 3;
+    int numBits = (int)m;
+
+    // Handle overflow.
+    if (m > MAX || m < 0) {
+      numBits = (int)MAX;
+    }
+
+    // Round to BITS_PER_BLOCK
+    numBits = (numBits + BITS_PER_BLOCK -1) & ~BITS_PER_BLOCK;
+
+    if (numBits < (DEFAULT_MINIMUM_BYTES << 3)) {
+      numBits = DEFAULT_MINIMUM_BYTES << 3;
+    }
+
+    return numBits;
+  }
+
+  @Override
+  public long getBitsetSize() {
+    return this.bitset.length;
+  }
+
+  @Override
+  public long hash(Object value) {
+    ByteBuffer plain;
+
+    if (value instanceof Binary) {
+      return hashFunction.hashBytes(((Binary) value).getBytes());
+    }
+
+    if (value instanceof Integer) {
+      plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
+      plain.order(ByteOrder.LITTLE_ENDIAN).putInt(((Integer)value));
+    } else if (value instanceof Long) {
+      plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
+      plain.order(ByteOrder.LITTLE_ENDIAN).putLong(((Long)value));
+    } else if (value instanceof Float) {
+      plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
+      plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(((Float)value));
+    } else if (value instanceof Double) {
+      plain = ByteBuffer.allocate(Double.SIZE/ Byte.SIZE);
+      plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(((Double)value));
+    } else {
+      throw new RuntimeException("Parquet Bloom filter: Not supported type");
+    }
+
+    return hashFunction.hashByteBuffer(plain);
+  }
+
+  @Override
+  public long hash(int value) {
+    ByteBuffer plain = ByteBuffer.allocate(Integer.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putInt(value);
+    return hashFunction.hashByteBuffer(plain);
+  }
+
+  @Override
+  public long hash(long value) {
+    ByteBuffer plain = ByteBuffer.allocate(Long.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putLong(value);
+    return hashFunction.hashByteBuffer(plain);
+  }
+
+  @Override
+  public long hash(double value) {
+    ByteBuffer plain = ByteBuffer.allocate(Double.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putDouble(value);
+    return hashFunction.hashByteBuffer(plain);
+  }
+
+  @Override
+  public long hash(float value) {
+    ByteBuffer plain = ByteBuffer.allocate(Float.SIZE/Byte.SIZE);
+    plain.order(ByteOrder.LITTLE_ENDIAN).putFloat(value);
+    return hashFunction.hashByteBuffer(plain);
+  }
+
+  @Override
+  public long hash(Binary value) {
+    return hashFunction.hashBytes(value.getBytes());
+  }
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
new file mode 100644
index 0000000..8b26c97
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilter.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.io.api.Binary;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A Bloom filter is a compact structure to indicate whether an item is not in a set or probably
+ * in a set. The Bloom filter usually consists of a bit set that represents a elements set,
+ * a hash strategy and a Bloom filter algorithm.
+ */
+public interface BloomFilter {
+  /* Bloom filter Hash strategy.
+   *
+   * xxHash is an extremely fast hash algorithm, running at RAM speed limits. It successfully
+   * completes the SMHasher test suite which evaluates collision, dispersion and randomness qualities
+   * of hash functions. It shows good performance advantage from its benchmark result.
+   * (see https://github.com/Cyan4973/xxHash).
+   */
+  enum HashStrategy {
+    XXH64(0);
+    HashStrategy(int value) {
+      this.value = value;
+    }
+    int value;
+  }
+
+  // Bloom filter algorithm.
+  enum Algorithm {
+    BLOCK(0);
+    Algorithm(int value) {
+      this.value = value;
+    }
+    int value;
+  }
+
+  // Bloom filter compression.
+  enum Compression {
+    UNCOMPRESSED(0);
+    Compression(int value) {
+      this.value = value;
+    }
+    int value;
+  }
+
+  /**
+   * Write the Bloom filter to an output stream. It writes the Bloom filter header including the
+   * bitset's length in bytes, the hash strategy, the algorithm, and the bitset.
+   *
+   * @param out the output stream to write
+   */
+  void writeTo(OutputStream out) throws IOException;
+
+  /**
+   * Insert an element to the Bloom filter, the element content is represented by
+   * the hash value of its plain encoding result.
+   *
+   * @param hash the hash result of element.
+   */
+  void insertHash(long hash);
+
+  /**
+   * Determine whether an element is in set or not.
+   *
+   * @param hash the hash value of element plain encoding result.
+   * @return false if element is must not in set, true if element probably in set.
+   */
+  boolean findHash(long hash);
+
+  /**
+   * Get the number of bytes for bitset in this Bloom filter.
+   *
+   * @return The number of bytes for bitset in this Bloom filter.
+   */
+  long getBitsetSize();
+
+  /**
+   * Compute hash for int value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  long hash(int value);
+
+  /**
+   * Compute hash for long value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  long hash(long value) ;
+
+  /**
+   * Compute hash for double value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  long hash(double value);
+
+  /**
+   * Compute hash for float value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  long hash(float value);
+
+  /**
+   * Compute hash for Binary value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  long hash(Binary value);
+
+  /**
+   * Compute hash for Object value by using its plain encoding result.
+   *
+   * @param value the value to hash
+   * @return hash result
+   */
+  long hash(Object value);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java
new file mode 100644
index 0000000..f7e28fd
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriteStore.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+/**
+ * Contains all writers for all columns of a row group
+ */
+public interface BloomFilterWriteStore {
+  /**
+   * Get bloom filter writer of a column
+   *
+   * @param path the descriptor for the column
+   * @return the corresponding Bloom filter writer
+   */
+  BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
new file mode 100644
index 0000000..e2504d8
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BloomFilterWriter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+public interface BloomFilterWriter {
+  /**
+   * Write a Bloom filter
+   *
+   * @param bloomFilter the Bloom filter to write
+   *
+   */
+  void writeBloomFilter(BloomFilter bloomFilter);
+}
+
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java
new file mode 100644
index 0000000..2043934
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/HashFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A interface contains a set of hash functions used by Bloom filter.
+ */
+public interface HashFunction {
+
+  /**
+   * compute the hash value for a byte array.
+   * @param input the input byte array
+   * @return a result of long value.
+   */
+  long hashBytes(byte[] input);
+
+  /**
+   * compute the hash value for a ByteBuffer.
+   * @param input the input ByteBuffer
+   * @return a result of long value.
+   */
+  long hashByteBuffer(ByteBuffer input);
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java
new file mode 100644
index 0000000..6c52b3c
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/XxHash.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import net.openhft.hashing.LongHashFunction;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The implementation of HashFunction interface. The XxHash uses XXH64 version xxHash
+ * with a seed of 0.
+ */
+public class XxHash implements HashFunction {
+  @Override
+  public long hashBytes(byte[] input) {
+    return LongHashFunction.xx(0).hashBytes(input);
+  }
+
+  @Override
+  public long hashByteBuffer(ByteBuffer input) {
+    return LongHashFunction.xx(0).hashBytes(input);
+  }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
new file mode 100644
index 0000000..d75c0e2
--- /dev/null
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.bloomfilter;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.parquet.column.values.RandomStr;
+import org.apache.parquet.io.api.Binary;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBlockSplitBloomFilter {
+  @Test
+  public void testConstructor () {
+    BloomFilter bloomFilter1 = new BlockSplitBloomFilter(0);
+    assertEquals(bloomFilter1.getBitsetSize(), BlockSplitBloomFilter.DEFAULT_MINIMUM_BYTES);
+    BloomFilter bloomFilter2 = new BlockSplitBloomFilter(BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES + 1);
+    assertEquals(bloomFilter2.getBitsetSize(), BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES);
+    BloomFilter bloomFilter3 = new BlockSplitBloomFilter(1000);
+    assertEquals(bloomFilter3.getBitsetSize(), 1024);
+  }
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  /*
+   * This test is used to test basic operations including inserting, finding and
+   * serializing and de-serializing.
+   */
+  @Test
+  public void testBasic () throws IOException {
+    final String[] testStrings = {"hello", "parquet", "bloom", "filter"};
+    BloomFilter bloomFilter = new BlockSplitBloomFilter(1024);
+
+    for(int i = 0; i < testStrings.length; i++) {
+      bloomFilter.insertHash(bloomFilter.hash(Binary.fromString(testStrings[i])));
+    }
+
+    File testFile = temp.newFile();
+    FileOutputStream fileOutputStream = new FileOutputStream(testFile);
+    bloomFilter.writeTo(fileOutputStream);
+    fileOutputStream.close();
+    FileInputStream fileInputStream = new FileInputStream(testFile);
+
+    byte[] value = new byte[4];
+    fileInputStream.read(value);
+    int length = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+    assertEquals(length, 1024);
+
+    fileInputStream.read(value);
+    int hash = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+    assertEquals(hash, BloomFilter.HashStrategy.XXH64.ordinal());
+
+    fileInputStream.read(value);
+    int algorithm = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+    assertEquals(algorithm, BloomFilter.Algorithm.BLOCK.ordinal());
+
+    fileInputStream.read(value);
+    int compression = ByteBuffer.wrap(value).order(ByteOrder.LITTLE_ENDIAN).getInt();
+    assertEquals(compression, BloomFilter.Compression.UNCOMPRESSED.ordinal());
+
+    byte[] bitset = new byte[length];
+    fileInputStream.read(bitset);
+    bloomFilter = new BlockSplitBloomFilter(bitset);
+    for (String testString : testStrings) {
+      assertTrue(bloomFilter.findHash(bloomFilter.hash(Binary.fromString(testString))));
+    }
+  }
+
+  @Test
+  public void testFPP() throws IOException {
+    final int totalCount = 100000;
+    final double FPP = 0.01;
+    final long SEED = 104729;
+
+    BloomFilter bloomFilter = new BlockSplitBloomFilter(BlockSplitBloomFilter.optimalNumOfBits(totalCount, FPP));
+    List<String> strings = new ArrayList<>();
+    RandomStr randomStr = new RandomStr(new Random(SEED));
+    for(int i = 0; i < totalCount; i++) {
+      String str = randomStr.get(10);
+      strings.add(str);
+      bloomFilter.insertHash(bloomFilter.hash(Binary.fromString(str)));
+    }
+
+    // The exist counts the number of times FindHash returns true.
+    int exist = 0;
+    for (int i = 0; i < totalCount; i++) {
+      String str = randomStr.get(8);
+      if (bloomFilter.findHash(bloomFilter.hash(Binary.fromString(str)))) {
+        exist ++;
+      }
+    }
+
+    // The exist should be probably less than 1000 according FPP 0.01.
+    assertTrue(exist < totalCount * FPP);
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java
new file mode 100644
index 0000000..c1e3774
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/BloomFilterLevel/BloomFilterImpl.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.filter2.BloomFilterLevel;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.apache.parquet.filter2.predicate.UserDefinedPredicate;
+import org.apache.parquet.hadoop.BloomFilterReader;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+import static org.apache.parquet.Preconditions.checkNotNull;
+
+public class BloomFilterImpl implements FilterPredicate.Visitor<Boolean>{
+  private static final Logger LOG = LoggerFactory.getLogger(BloomFilterImpl.class);
+  private static final boolean BLOCK_MIGHT_MATCH = false;
+  private static final boolean BLOCK_CANNOT_MATCH = true;
+
+  private final Map<ColumnPath, ColumnChunkMetaData> columns = new HashMap<ColumnPath, ColumnChunkMetaData>();
+
+  public static boolean canDrop(FilterPredicate pred, List<ColumnChunkMetaData> columns, BloomFilterReader bloomFilterReader) {
+    checkNotNull(pred, "pred");
+    checkNotNull(columns, "columns");
+    return pred.accept(new BloomFilterImpl(columns, bloomFilterReader));
+  }
+
+  private BloomFilterImpl(List<ColumnChunkMetaData> columnsList, BloomFilterReader bloomFilterReader) {
+    for (ColumnChunkMetaData chunk : columnsList) {
+      columns.put(chunk.getPath(), chunk);
+    }
+
+    this.bloomFilterReader = bloomFilterReader;
+  }
+
+  private BloomFilterReader bloomFilterReader;
+
+  private ColumnChunkMetaData getColumnChunk(ColumnPath columnPath) {
+    return columns.get(columnPath);
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.Eq<T> eq) {
+    T value = eq.getValue();
+
+    if (value == null) {
+      // the bloom filter bitset contains only non-null values so isn't helpful. this
+      // could check the column stats, but the StatisticsFilter is responsible
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    Operators.Column<T> filterColumn = eq.getColumn();
+    ColumnChunkMetaData meta = getColumnChunk(filterColumn.getColumnPath());
+    if (meta == null) {
+      // the column isn't in this file so all values are null, but the value
+      // must be non-null because of the above check.
+      return BLOCK_CANNOT_MATCH;
+    }
+
+    try {
+      BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(meta);
+      if (bloomFilter != null && !bloomFilter.findHash(bloomFilter.hash(value))) {
+        return BLOCK_CANNOT_MATCH;
+      }
+    } catch (RuntimeException e) {
+      LOG.warn(e.getMessage());
+      return BLOCK_MIGHT_MATCH;
+    }
+
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.NotEq<T> notEq) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.Lt<T> lt) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.LtEq<T> ltEq) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.Gt<T> gt) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>> Boolean visit(Operators.GtEq<T> gtEq) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public Boolean visit(Operators.And and) {
+    return and.getLeft().accept(this) || and.getRight().accept(this);
+  }
+
+  @Override
+  public Boolean visit(Operators.Or or) {
+    return or.getLeft().accept(this) && or.getRight().accept(this);
+  }
+
+  @Override
+  public Boolean visit(Operators.Not not) {
+    throw new IllegalArgumentException(
+      "This predicate contains a not! Did you forget to run this predicate through LogicalInverseRewriter? " + not);
+  }
+
+  private <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.UserDefined<T, U> ud, boolean inverted) {
+    return BLOCK_MIGHT_MATCH;
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.UserDefined<T, U> udp) {
+    return visit(udp, false);
+  }
+
+  @Override
+  public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> Boolean visit(Operators.LogicalNotUserDefined<T, U> udp) {
+    return visit(udp.getUserDefined(), true);
+  }
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
index d1d40e9..fe6f637 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/filter2/compat/RowGroupFilter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.parquet.filter2.BloomFilterLevel.BloomFilterImpl;
 import org.apache.parquet.filter2.compat.FilterCompat.Filter;
 import org.apache.parquet.filter2.compat.FilterCompat.NoOpFilter;
 import org.apache.parquet.filter2.compat.FilterCompat.Visitor;
@@ -32,6 +33,8 @@
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.parquet.Preconditions.checkNotNull;
 
@@ -45,10 +48,12 @@
   private final MessageType schema;
   private final List<FilterLevel> levels;
   private final ParquetFileReader reader;
+  private Logger logger = LoggerFactory.getLogger(RowGroupFilter.class);
 
   public enum FilterLevel {
     STATISTICS,
-    DICTIONARY
+    DICTIONARY,
+    BLOOMFILTER
   }
 
   /**
@@ -104,6 +109,11 @@
         drop = DictionaryFilter.canDrop(filterPredicate, block.getColumns(), reader.getDictionaryReader(block));
       }
 
+      if (!drop && levels.contains(FilterLevel.BLOOMFILTER)) {
+        drop = BloomFilterImpl.canDrop(filterPredicate, block.getColumns(), reader.getBloomFilterDataReader(block));
+        if (drop) logger.info("Block drop by Bloom filter");
+      }
+
       if(!drop) {
         filteredBlocks.add(block);
       }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index bfb4aa3..625bbc3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -473,6 +473,7 @@
           columnMetaData.getTotalSize(),
           columnMetaData.getFirstDataPageOffset());
       columnChunk.meta_data.dictionary_page_offset = columnMetaData.getDictionaryPageOffset();
+      columnChunk.meta_data.setBloom_filter_offset(columnMetaData.getBloomFilterOffset());
       if (!columnMetaData.getStatistics().isEmpty()) {
         columnChunk.meta_data.setStatistics(toParquetStatistics(columnMetaData.getStatistics(), this.statisticsTruncateLength));
       }
@@ -1240,6 +1241,7 @@
               metaData.total_uncompressed_size);
           column.setColumnIndexReference(toColumnIndexReference(columnChunk));
           column.setOffsetIndexReference(toOffsetIndexReference(columnChunk));
+          column.setBloomFilterOffset(metaData.bloom_filter_offset);
           // TODO
           // index_page_offset
           // key_value_metadata
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java
new file mode 100644
index 0000000..3ad91ce
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/BloomFilterReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * Bloom filter reader that reads Bloom filter data from an open {@link ParquetFileReader}.
+ *
+ */
+public class BloomFilterReader {
+  private final ParquetFileReader reader;
+  private final Map<ColumnPath, ColumnChunkMetaData> columns;
+  private final Map<ColumnPath, BloomFilter> cache = new HashMap<>();
+
+  public BloomFilterReader(ParquetFileReader fileReader, BlockMetaData block) {
+    this.reader = fileReader;
+    this.columns = new HashMap<>();
+    for (ColumnChunkMetaData column : block.getColumns()) {
+      columns.put(column.getPath(), column);
+    }
+  }
+
+  public BloomFilter readBloomFilter(ColumnChunkMetaData meta) {
+    if (cache.containsKey(meta.getPath())) {
+      return cache.get(meta.getPath());
+    }
+    try {
+      synchronized (cache) {
+        if (!cache.containsKey(meta.getPath())) {
+          BloomFilter bloomFilter = reader.readBloomFilter(meta);
+          if (bloomFilter == null) return null;
+          cache.put(meta.getPath(), bloomFilter);
+        }
+      }
+      return cache.get(meta.getPath());
+    } catch (IOException e) {
+      throw new RuntimeException(
+        "Failed to read Bloom filter data", e);
+    }
+  }
+
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 72f26fc..d2e4c96 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -37,6 +37,9 @@
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
 import org.apache.parquet.format.converter.ParquetMetadataConverter;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
@@ -47,12 +50,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class ColumnChunkPageWriteStore implements PageWriteStore {
+class ColumnChunkPageWriteStore implements PageWriteStore, BloomFilterWriteStore {
   private static final Logger LOG = LoggerFactory.getLogger(ColumnChunkPageWriteStore.class);
 
   private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
 
-  private static final class ColumnChunkPageWriter implements PageWriter {
+  private static final class ColumnChunkPageWriter implements PageWriter, BloomFilterWriter {
 
     private final ColumnDescriptor path;
     private final BytesCompressor compressor;
@@ -71,6 +74,7 @@
     private Set<Encoding> dlEncodings = new HashSet<Encoding>();
     private List<Encoding> dataEncodings = new ArrayList<Encoding>();
 
+    private BloomFilter bloomFilter;
     private ColumnIndexBuilder columnIndexBuilder;
     private OffsetIndexBuilder offsetIndexBuilder;
     private Statistics totalStatistics;
@@ -249,6 +253,7 @@
           totalStatistics,
           columnIndexBuilder,
           offsetIndexBuilder,
+          bloomFilter,
           rlEncodings,
           dlEncodings,
           dataEncodings);
@@ -289,6 +294,10 @@
       return buf.memUsageString(prefix + " ColumnChunkPageWriter");
     }
 
+    @Override
+    public void writeBloomFilter(BloomFilter bloomFilter) {
+      this.bloomFilter = bloomFilter;
+    }
   }
 
   private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
@@ -313,6 +322,11 @@
     return writers.get(path);
   }
 
+  @Override
+  public BloomFilterWriter getBloomFilterWriter(ColumnDescriptor path) {
+    return writers.get(path);
+  }
+
   public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
     for (ColumnDescriptor path : schema.getColumns()) {
       ColumnChunkPageWriter pageWriter = writers.get(path);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index c3da323..18ee788 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -28,6 +28,7 @@
 
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore;
 import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
@@ -64,6 +65,7 @@
 
   private ColumnWriteStore columnStore;
   private ColumnChunkPageWriteStore pageStore;
+  private BloomFilterWriteStore bloomFilterWriteStore;
   private RecordConsumer recordConsumer;
 
   /**
@@ -101,9 +103,12 @@
   }
 
   private void initStore() {
-    pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(),
-        props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled());
-    columnStore = props.newColumnWriteStore(schema, pageStore);
+    ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore(compressor,
+        schema, props.getAllocator(), props.getColumnIndexTruncateLength());
+    pageStore = columnChunkPageWriteStore;
+    bloomFilterWriteStore = columnChunkPageWriteStore;
+
+    columnStore = props.newColumnWriteStore(schema, pageStore, bloomFilterWriteStore);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
     this.recordConsumer = columnIO.getRecordWriter(columnStore);
     writeSupport.prepareForWrite(recordConsumer);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 366c429..52cacb0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -34,6 +34,8 @@
 import java.io.IOException;
 import java.io.SequenceInputStream;
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.IntBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -69,6 +71,8 @@
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
 import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -1050,6 +1054,51 @@
         converter.getEncoding(dictHeader.getEncoding()));
   }
 
+  public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) {
+    return new BloomFilterReader(this, block);
+  }
+
+  /**
+   * Reads Bloom filter data for the given column chunk.
+   *
+   * @param meta a column's ColumnChunkMetaData to read the dictionary from
+   * @return an BloomFilter object.
+   * @throws IOException if there is an error while reading the Bloom filter.
+   */
+  public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException {
+    long bloomFilterOffset = meta.getBloomFilterOffset();
+    f.seek(bloomFilterOffset);
+
+    // Read Bloom filter data header.
+    byte[] bytes = new byte[BlockSplitBloomFilter.HEADER_SIZE];
+    f.read(bytes);
+    ByteBuffer bloomHeader = ByteBuffer.wrap(bytes);
+    IntBuffer headerBuffer = bloomHeader.order(ByteOrder.LITTLE_ENDIAN).asIntBuffer();
+    int numBytes = headerBuffer.get();
+    if (numBytes <= 0 || numBytes > BlockSplitBloomFilter.DEFAULT_MAXIMUM_BYTES) {
+      return null;
+    }
+
+    BloomFilter.HashStrategy hash = BloomFilter.HashStrategy.values()[headerBuffer.get()];
+    if (hash != BlockSplitBloomFilter.HashStrategy.XXH64) {
+      return null;
+    }
+
+    BloomFilter.Algorithm algorithm = BloomFilter.Algorithm.values()[headerBuffer.get()];
+    if (algorithm != BlockSplitBloomFilter.Algorithm.BLOCK) {
+      return null;
+    }
+
+    BloomFilter.Compression compression = BloomFilter.Compression.values()[headerBuffer.get()];
+    if (compression != BlockSplitBloomFilter.Compression.UNCOMPRESSED) {
+      return null;
+    }
+
+    byte[] bitset = new byte[numBytes];
+    f.readFully(bitset);
+    return new BlockSplitBloomFilter(bitset);
+  }
+
   /**
    * @param column
    *          the column chunk which the column index is to be returned for
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 39a75bc..63ef7bc 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -50,6 +50,8 @@
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.example.DummyRecordConverter;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.Util;
@@ -110,6 +112,9 @@
   private final List<List<ColumnIndex>> columnIndexes = new ArrayList<>();
   private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
 
+  // The Bloom filters
+  private final List<List<BloomFilter>> bloomFilters = new ArrayList<>();
+
   // row group data
   private BlockMetaData currentBlock; // appended to by endColumn
 
@@ -117,6 +122,9 @@
   private List<ColumnIndex> currentColumnIndexes;
   private List<OffsetIndex> currentOffsetIndexes;
 
+  // The Bloom filter for the actual block
+  private List<BloomFilter> currentBloomFilters;
+
   // row group data set at the start of a row group
   private long currentRecordCount; // set in startBlock
 
@@ -355,6 +363,8 @@
 
     currentColumnIndexes = new ArrayList<>();
     currentOffsetIndexes = new ArrayList<>();
+
+    currentBloomFilters = new ArrayList<>();
   }
 
   /**
@@ -575,6 +585,14 @@
   }
 
   /**
+   * Write a Bloom filter
+   * @param bloomFilter the bloom filter of column values
+   */
+  void writeBloomFilter(BloomFilter bloomFilter)  {
+    currentBloomFilters.add(bloomFilter);
+  }
+
+  /**
    * Writes a column chunk at once
    * @param descriptor the descriptor of the column
    * @param valueCount the value count in this column
@@ -586,6 +604,7 @@
    * @param totalStats accumulated statistics for the column chunk
    * @param columnIndexBuilder the builder object for the column index
    * @param offsetIndexBuilder the builder object for the offset index
+   * @param bloomFilter the bloom filter for this column
    * @param rlEncodings the RL encodings used in this column chunk
    * @param dlEncodings the DL encodings used in this column chunk
    * @param dataEncodings the data encodings used in this column chunk
@@ -601,14 +620,18 @@
       Statistics<?> totalStats,
       ColumnIndexBuilder columnIndexBuilder,
       OffsetIndexBuilder offsetIndexBuilder,
+      BloomFilter bloomFilter,
       Set<Encoding> rlEncodings,
       Set<Encoding> dlEncodings,
       List<Encoding> dataEncodings) throws IOException {
     startColumn(descriptor, valueCount, compressionCodecName);
 
     state = state.write();
+
     if (dictionaryPage != null) {
       writeDictionaryPage(dictionaryPage);
+    }  else if (bloomFilter != null) {
+      currentBloomFilters.add(bloomFilter);
     }
     LOG.debug("{}: write data pages", out.getPos());
     long headersSize = bytes.size() - compressedTotalPageSize;
@@ -675,8 +698,10 @@
     blocks.add(currentBlock);
     columnIndexes.add(currentColumnIndexes);
     offsetIndexes.add(currentOffsetIndexes);
+    bloomFilters.add(currentBloomFilters);
     currentColumnIndexes = null;
     currentOffsetIndexes = null;
+    currentBloomFilters =  null;
     currentBlock = null;
   }
 
@@ -858,6 +883,7 @@
     state = state.end();
     serializeColumnIndexes(columnIndexes, blocks, out);
     serializeOffsetIndexes(offsetIndexes, blocks, out);
+    serializeBloomFilters(bloomFilters, blocks, out);
     LOG.debug("{}: end", out.getPos());
     this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
     serializeFooter(footer, out);
@@ -907,6 +933,28 @@
     }
   }
 
+  private static void serializeBloomFilters(
+    List<List<BloomFilter>> bloomFilters,
+    List<BlockMetaData> blocks,
+    PositionOutputStream out) throws IOException {
+    LOG.debug("{}: bloom filters", out.getPos());
+    for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
+      List<ColumnChunkMetaData> columns = blocks.get(bIndex).getColumns();
+      List<BloomFilter> blockBloomFilters = bloomFilters.get(bIndex);
+      if (blockBloomFilters.isEmpty()) continue;
+      for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
+        BloomFilter bloomFilter = blockBloomFilters.get(cIndex);
+        if (bloomFilter == null) {
+          continue;
+        }
+        ColumnChunkMetaData column = columns.get(cIndex);
+        long offset = out.getPos();
+        column.setBloomFilterOffset(offset);
+        bloomFilter.writeTo(out);
+      }
+    }
+  }
+
   private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException {
     long footerIndex = out.getPos();
     ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index c3c52e3..62f21c8 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 676e2ca..fd79d7d 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,11 @@
 import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -145,6 +150,9 @@
   public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
   public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
   public static final String STATISTICS_TRUNCATE_LENGTH = "parquet.statistics.truncate.length";
+  public static final String BLOOM_FILTER_COLUMN_NAMES = "parquet.bloom.filter.column.names";
+  public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
+  public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes";
   public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
   public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
 
@@ -212,6 +220,43 @@
     return getEnableDictionary(getConfiguration(jobContext));
   }
 
+  public static int getBloomFilterMaxBytes(Configuration conf) {
+    return conf.getInt(BLOOM_FILTER_MAX_BYTES,
+      ParquetProperties.DEFAULT_MAX_BLOOM_FILTER_BYTES);
+  }
+
+  public static Set<String> getBloomFilterColumns(Configuration conf) {
+    String columnNames = conf.get(BLOOM_FILTER_COLUMN_NAMES);
+    if (columnNames != null) {
+      return new HashSet<>(Arrays.asList(columnNames.split(",")));
+    } else {
+      return new HashSet<>();
+    }
+  }
+
+  public static Map<String, Long> getBloomFilterColumnExpectedNDVs(Configuration conf) {
+    Map<String, Long> kv = new HashMap<>();
+    String columnNamesConf = conf.get(BLOOM_FILTER_COLUMN_NAMES);
+    String expectedNDVsConf = conf.get(BLOOM_FILTER_EXPECTED_NDV);
+
+    if (columnNamesConf == null || expectedNDVsConf == null) {
+      return kv;
+    }
+
+    String[] columnNames = columnNamesConf.split(",");
+    String[] expectedNDVs = expectedNDVsConf.split(",");
+
+    if (columnNames.length == expectedNDVs.length) {
+      for (int i = 0; i < columnNames.length; i++) {
+        kv.put(columnNames[i], Long.getLong(expectedNDVs[i]));
+      }
+    } else {
+      LOG.warn("Bloom filter column names are not match expected NDVs");
+    }
+
+    return kv;
+  }
+
   public static int getBlockSize(JobContext jobContext) {
     return getBlockSize(getConfiguration(jobContext));
   }
@@ -435,6 +480,9 @@
         .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
         .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
         .withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
+        .withBloomFilterColumnNames(getBloomFilterColumns(conf))
+        .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
+        .withBloomFilterColumnNdvs(getBloomFilterColumnExpectedNDVs(conf))
         .withPageRowCountLimit(getPageRowCountLimit(conf))
         .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
         .build();
@@ -456,6 +504,10 @@
       LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
       LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
       LOG.info("Truncate length for statistics min/max  is: {}", props.getStatisticsTruncateLength());
+      LOG.info("Bloom filter enabled column names are: {}", props.getBloomFilterColumns());
+      LOG.info("Max Bloom filter size for a column is {}", props.getMaxBloomFilterBytes());
+      LOG.info("Bloom filter enabled column expected number of distinct values are: {}",
+        props.getBloomFilterColumnExpectedNDVs().values());
       LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
       LOG.info("Writing page checksums is: {}", props.getPageWriteChecksumEnabled() ? "on" : "off");
     }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 870a1a6..0ebd7d3 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,8 @@
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -539,6 +541,21 @@
     }
 
     /**
+     * Enables bloom filter column names for the constructed writer.
+     *
+     * @return this builder for method chaining.
+     */
+    public SELF withBloomFilterColumnNames(String... columnNames) {
+      if (columnNames != null) {
+        encodingPropsBuilder.withBloomFilterColumnNames(
+          new HashSet<>(Arrays.asList(columnNames))
+        );
+      }
+
+      return self();
+    }
+
+    /**
      * Set a property that will be available to the read path. For writers that use a Hadoop
      * configuration, this is the recommended way to add configuration values.
      *
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index e6aa104..2c24356 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -173,6 +173,8 @@
   private IndexReference columnIndexReference;
   private IndexReference offsetIndexReference;
 
+  private long bloomFilterOffset;
+
   protected ColumnChunkMetaData(ColumnChunkProperties columnChunkProperties) {
     this(null, columnChunkProperties);
   }
@@ -275,6 +277,23 @@
   }
 
   /**
+   * @param bloomFilterOffset
+   *          the reference to the Bloom filter
+   */
+  @Private
+  public void setBloomFilterOffset(long bloomFilterOffset) {
+    this.bloomFilterOffset = bloomFilterOffset;
+  }
+
+  /**
+   * @return the offset to the Bloom filter
+   */
+  @Private
+  public long getBloomFilterOffset() {
+    return bloomFilterOffset;
+  }
+
+  /**
    * @return all the encodings used in this column
    */
   public Set<Encoding> getEncodings() {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index 88c8d83..0569c42 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -260,6 +260,7 @@
           same(OffsetIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no offset index
           any(),
           any(),
+          any(),
           any());
     }
   }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 8763cac..3de4524 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,6 +27,8 @@
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.Version;
 import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
 import org.junit.Assume;
 import org.junit.Rule;
@@ -220,6 +222,39 @@
   }
 
   @Test
+  public void testBloomFilterWriteRead() throws Exception {
+    MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }");
+    File testFile = temp.newFile();
+    testFile.delete();
+    Path path = new Path(testFile.toURI());
+    Configuration configuration = new Configuration();
+    configuration.set("parquet.bloom.filter.column.names", "foo");
+    String[] colPath = {"foo"};
+    ColumnDescriptor col = schema.getColumnDescription(colPath);
+    BinaryStatistics stats1 = new BinaryStatistics();
+    ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path);
+    w.start();
+    w.startBlock(3);
+    w.startColumn(col, 5, CODEC);
+    w.writeDataPage(2, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.writeDataPage(3, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN);
+    w.endColumn();
+    BloomFilter blockSplitBloomFilter = new BlockSplitBloomFilter(0);
+    blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("hello")));
+    blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("world")));
+    w.writeBloomFilter(blockSplitBloomFilter);
+    w.endBlock();
+    w.end(new HashMap<>());
+    ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
+    ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+      Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)));
+    BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
+    BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0));
+    assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello"))));
+    assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world"))));
+  }
+
+  @Test
   public void testAlignmentWithPadding() throws Exception {
     File testFile = temp.newFile();
 
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index c837d9a..9b01bb5 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,6 +19,8 @@
 package org.apache.parquet.hadoop;
 
 import static java.util.Arrays.asList;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -45,8 +47,11 @@
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import net.openhft.hashing.LongHashFunction;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.values.bloomfilter.BloomFilter;
+import org.apache.parquet.example.data.GroupFactory;
 import org.apache.parquet.hadoop.example.ExampleInputFormat;
 import org.apache.parquet.hadoop.example.ExampleParquetWriter;
 import org.apache.parquet.schema.GroupType;
@@ -204,4 +209,43 @@
       assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount);
     }
   }
+
+  @Test
+  public void testParquetFileWithBloomFilter() throws IOException {
+    MessageType schema = Types.buildMessage().
+      required(BINARY).as(stringType()).named("name").named("msg");
+
+    String[] testNames = {"hello", "parquet", "bloom", "filter"};
+
+    final int recordCount = testNames.length;
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(schema, conf);
+
+    GroupFactory factory = new SimpleGroupFactory(schema);
+    File file = temp.newFile();
+    file.delete();
+    Path path = new Path(file.getAbsolutePath());
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+      .withPageRowCountLimit(10)
+      .withConf(conf)
+      .withDictionaryEncoding(false)
+      .withBloomFilterColumnNames("name")
+      .build()) {
+      for (String testName : testNames) {
+        writer.write(factory.newGroup().append("name", testName));
+      }
+    }
+
+    ParquetMetadata footer = readFooter(conf, path, NO_FILTER);
+    ParquetFileReader reader = new ParquetFileReader(
+      conf, footer.getFileMetaData(), path, footer.getBlocks(), schema.getColumns());
+
+    BloomFilter bloomFilter = reader.getBloomFilterDataReader(footer.getBlocks().get(0))
+      .readBloomFilter(footer.getBlocks().get(0).getColumns().get(0));
+
+    for (String name: testNames) {
+      assertTrue(bloomFilter.findHash(
+        LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
+    }
+  }
 }
diff --git a/pom.xml b/pom.xml
index 719655f..77e942a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -289,6 +289,10 @@
                     <pattern>it.unimi.dsi</pattern>
                     <shadedPattern>${shade.prefix}.it.unimi.dsi</shadedPattern>
                   </relocation>
+                  <relocation>
+                    <pattern>com.google.common</pattern>
+                    <shadedPattern>${shade.prefix}.com.google.common</shadedPattern>
+                  </relocation>
                 </relocations>
               </configuration>
             </execution>