PARQUET-2254: Support building bloom filter that adapts to the data

This closes #1042 
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 3ff58a8..dda5873 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -18,6 +18,8 @@
  */
 package org.apache.parquet.column;
 
+import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
+
 import java.util.Objects;
 import java.util.OptionalDouble;
 import java.util.OptionalLong;
@@ -38,8 +40,6 @@
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import org.apache.parquet.schema.MessageType;
 
-import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
-
 /**
  * This class represents all the configurable Parquet properties.
  */
@@ -59,6 +59,8 @@
   public static final int DEFAULT_MAX_BLOOM_FILTER_BYTES = 1024 * 1024;
   public static final boolean DEFAULT_BLOOM_FILTER_ENABLED = false;
   public static final double DEFAULT_BLOOM_FILTER_FPP = 0.01;
+  public static final boolean DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED = false;
+  public static final int DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER = 5;
 
   public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
 
@@ -105,6 +107,8 @@
   private final ColumnProperty<Double> bloomFilterFPPs;
   private final int maxBloomFilterBytes;
   private final ColumnProperty<Boolean> bloomFilterEnabled;
+  private final ColumnProperty<Boolean> adaptiveBloomFilterEnabled;
+  private final ColumnProperty<Integer> numBloomFilterCandidates;
   private final int pageRowCountLimit;
   private final boolean pageWriteChecksumEnabled;
   private final boolean enableByteStreamSplit;
@@ -128,6 +132,8 @@
     this.bloomFilterFPPs = builder.bloomFilterFPPs.build();
     this.bloomFilterEnabled = builder.bloomFilterEnabled.build();
     this.maxBloomFilterBytes = builder.maxBloomFilterBytes;
+    this.adaptiveBloomFilterEnabled = builder.adaptiveBloomFilterEnabled.build();
+    this.numBloomFilterCandidates = builder.numBloomFilterCandidates.build();
     this.pageRowCountLimit = builder.pageRowCountLimit;
     this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled;
     this.enableByteStreamSplit = builder.enableByteStreamSplit;
@@ -275,6 +281,14 @@
     return maxBloomFilterBytes;
   }
 
+  public boolean getAdaptiveBloomFilterEnabled(ColumnDescriptor column) {
+    return adaptiveBloomFilterEnabled.getValue(column);
+  }
+
+  public int getBloomFilterCandidatesCount(ColumnDescriptor column) {
+    return numBloomFilterCandidates.getValue(column);
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -317,6 +331,8 @@
     private final ColumnProperty.Builder<Long> bloomFilterNDVs;
     private final ColumnProperty.Builder<Double> bloomFilterFPPs;
     private int maxBloomFilterBytes = DEFAULT_MAX_BLOOM_FILTER_BYTES;
+    private final ColumnProperty.Builder<Boolean> adaptiveBloomFilterEnabled;
+    private final ColumnProperty.Builder<Integer> numBloomFilterCandidates;
     private final ColumnProperty.Builder<Boolean> bloomFilterEnabled;
     private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
     private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
@@ -327,6 +343,8 @@
       bloomFilterEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_ENABLED);
       bloomFilterNDVs = ColumnProperty.<Long>builder().withDefaultValue(null);
       bloomFilterFPPs = ColumnProperty.<Double>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_FPP);
+      adaptiveBloomFilterEnabled = ColumnProperty.<Boolean>builder().withDefaultValue(DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED);
+      numBloomFilterCandidates = ColumnProperty.<Integer>builder().withDefaultValue(DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER);
     }
 
     private Builder(ParquetProperties toCopy) {
@@ -344,6 +362,8 @@
       this.bloomFilterNDVs = ColumnProperty.<Long>builder(toCopy.bloomFilterNDVs);
       this.bloomFilterFPPs = ColumnProperty.<Double>builder(toCopy.bloomFilterFPPs);
       this.bloomFilterEnabled = ColumnProperty.<Boolean>builder(toCopy.bloomFilterEnabled);
+      this.adaptiveBloomFilterEnabled = ColumnProperty.<Boolean>builder(toCopy.adaptiveBloomFilterEnabled);
+      this.numBloomFilterCandidates = ColumnProperty.<Integer>builder(toCopy.numBloomFilterCandidates);
       this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes;
       this.enableByteStreamSplit = toCopy.enableByteStreamSplit;
     }
@@ -504,6 +524,30 @@
     }
 
     /**
+     * Whether to use adaptive bloom filter to automatically adjust the bloom filter size according to
+     * `parquet.bloom.filter.max.bytes`.
+     * If NDV (number of distinct values) for a specified column is set, it will be ignored
+     *
+     * @param enabled whether to use adaptive bloom filter
+     */
+    public Builder withAdaptiveBloomFilterEnabled(boolean enabled) {
+      this.adaptiveBloomFilterEnabled.withDefaultValue(enabled);
+      return this;
+    }
+
+    /**
+     * When `AdaptiveBloomFilter` is enabled, set how many bloom filter candidates to use.
+     *
+     * @param columnPath the path of the column (dot-string)
+     * @param number the number of candidates
+     */
+    public Builder withBloomFilterCandidatesNumber(String columnPath, int number) {
+      Preconditions.checkArgument(number > 0, "Invalid candidates number for column \"%s\": %d", columnPath, number);
+      this.numBloomFilterCandidates.withDefaultValue(number);
+      return this;
+    }
+
+    /**
      * Enable or disable the bloom filter for the specified column.
      * One may either disable bloom filters for all columns by invoking {@link #withBloomFilterEnabled(boolean)} with a
      * {@code false} value and then enable the bloom filters for the required columns one-by-one by invoking this
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index 36b6099..8e11676 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -32,11 +32,11 @@
 import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
 import org.apache.parquet.column.values.bloomfilter.BloomFilter;
 import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter;
+import org.apache.parquet.column.values.bloomfilter.AdaptiveBlockSplitBloomFilter;
 import org.apache.parquet.io.ParquetEncodingException;
 import org.apache.parquet.io.api.Binary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.parquet.bytes.BytesInput;
 
 /**
  * Base implementation for {@link ColumnWriter} to be extended to specialize for V1 and V2 pages.
@@ -97,7 +97,12 @@
       int optimalNumOfBits = BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), fpp.getAsDouble());
       this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, maxBloomFilterSize);
     } else {
-      this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, maxBloomFilterSize);
+      if(props.getAdaptiveBloomFilterEnabled(path)) {
+        int numCandidates = props.getBloomFilterCandidatesCount(path);
+        this.bloomFilter = new AdaptiveBlockSplitBloomFilter(maxBloomFilterSize, numCandidates, fpp.getAsDouble(), path);
+      } else {
+        this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, maxBloomFilterSize);
+      }
     }
   }
 
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java
new file mode 100644
index 0000000..77f8f90
--- /dev/null
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number
+ * of real data distinct values.
+ * `AdaptiveBlockSplitBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in
+ * the candidates at the same time. Some candidates that are too small will be eliminated during the insertion process.
+ * Finally we will choose the most appropriate size candidate to write out.
+ */
+public class AdaptiveBlockSplitBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AdaptiveBlockSplitBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct values are greater than the
+  // expected NDV of one candidate, it will be removed. Finally we will choose the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and also used as an approximate deduplication counter
+  private BloomFilterCandidate largestCandidate;
+
+  // the accumulator of the number of distinct hash values that have been inserted so far
+  private long numDistinctHashValues = 0;
+
+  // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  // indicates the step size to find the NDV value corresponding to numBytes
+  private static final int NDV_STEP = 500;
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  private int minimumCandidateNdv = 16;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  /**
+   * @param maximumBytes  the maximum bytes size of candidate
+   * @param numCandidates the number of candidates
+   * @param fpp           the false positive probability
+   */
+  public AdaptiveBlockSplitBloomFilter(int maximumBytes, int numCandidates, double fpp, ColumnDescriptor column) {
+    this(maximumBytes, HashStrategy.XXH64, fpp, numCandidates, column);
+  }
+
+  public AdaptiveBlockSplitBloomFilter(int maximumBytes, HashStrategy hashStrategy, double fpp,
+    int numCandidates, ColumnDescriptor column) {
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(maximumBytes, numCandidates, fpp);
+  }
+
+  /**
+   * This method will generate candidates according to the maximum acceptable bytes size of bloom filter.
+   * Because the bytes size of the candidate need to be a power of 2, here we set the candidate size to be
+   * a proportion of `maxBytes` like `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum bytes size of candidate
+   * @param numCandidates the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int numCandidates, double fpp) {
+    int candidateByteSize = calculateBoundedPowerOfTwo(maxBytes);
+    for (int i = 0; i < numCandidates; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateBoundedPowerOfTwo(candidateByteSize / 2);
+    }
+    if (candidates.isEmpty()) {
+      // `maxBytes` is too small, but at least one candidate will be generated, 32 bytes size and can accept 16 distinct values.
+      candidates.add(new BloomFilterCandidate(minimumCandidateNdv, minimumBytes, minimumBytes, maximumBytes, hashStrategy));
+    }
+    largestCandidate = candidates.stream().max(BloomFilterCandidate::compareTo).get();
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct values.
+   * The expected number result may be slightly smaller than what `numBytes` can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    while (optimalBytes < numBytes) {
+      expectedNDV += NDV_STEP;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) / 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= NDV_STEP;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bytes size should be power of 2, see [[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateBoundedPowerOfTwo(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the largest power of two less than `numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+  }
+
+  protected List<BloomFilterCandidate> getCandidates() {
+    return candidates;
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    finalized = true;
+    BloomFilterCandidate optimalBloomFilter = optimalCandidate();
+    optimalBloomFilter.bloomFilter.writeTo(out);
+    String columnName = column != null && column.getPath() != null ? Arrays.toString(column.getPath()) : "unknown";
+    LOG.info("The number of distinct values in {} is approximately {}, the optimal bloom filter can accept {}"
+        + " distinct values, byte size is {}.",
+      columnName, numDistinctHashValues, optimalBloomFilter.getExpectedNDV(),
+      optimalBloomFilter.bloomFilter.getBitsetSize());
+  }
+
+  /**
+   * Insert an element to the multiple bloom filter candidates and remove the bad candidate
+   * if the number of distinct values exceeds its expected size.
+   *
+   * @param hash the hash result of element.
+   */
+  @Override
+  public void insertHash(long hash) {
+    Preconditions.checkArgument(!finalized,
+      "Insertion has been mark as finalized, no more data is allowed!");
+    if (!largestCandidate.bloomFilter.findHash(hash)) {
+      numDistinctHashValues++;
+    }
+    // distinct values exceed the expected size, remove the bad bloom filter (leave at least the max bloom filter candidate)
+    candidates.removeIf(candidate -> candidate.getExpectedNDV() < numDistinctHashValues && candidate != largestCandidate);
+    candidates.forEach(candidate -> candidate.getBloomFilter().insertHash(hash));
+  }
+
+  @Override
+  public int getBitsetSize() {
+    return optimalCandidate().getBloomFilter().getBitsetSize();
+  }
+
+  @Override
+  public boolean findHash(long hash) {
+    return largestCandidate.bloomFilter.findHash(hash);
+  }
+
+  @Override
+  public long hash(Object value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  @Override
+  public HashStrategy getHashStrategy() {
+    return largestCandidate.bloomFilter.getHashStrategy();
+  }
+
+  @Override
+  public Algorithm getAlgorithm() {
+    return largestCandidate.bloomFilter.getAlgorithm();
+  }
+
+  @Override
+  public Compression getCompression() {
+    return largestCandidate.bloomFilter.getCompression();
+  }
+
+  @Override
+  public long hash(int value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  @Override
+  public long hash(long value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  @Override
+  public long hash(double value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  @Override
+  public long hash(float value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  @Override
+  public long hash(Binary value) {
+    return largestCandidate.bloomFilter.hash(value);
+  }
+
+  protected class BloomFilterCandidate implements Comparable<BloomFilterCandidate> {
+    // the bloom filter candidate
+    final private BlockSplitBloomFilter bloomFilter;
+    // the max excepted number of distinct value in the candidate
+    final private int expectedNDV;
+
+    public BloomFilterCandidate(int expectedNDV, int candidateBytes,
+      int minimumBytes, int maximumBytes, HashStrategy hashStrategy) {
+      this.bloomFilter = new BlockSplitBloomFilter(candidateBytes, minimumBytes, maximumBytes, hashStrategy);
+      this.expectedNDV = expectedNDV;
+    }
+
+    public BlockSplitBloomFilter getBloomFilter() {
+      return bloomFilter;
+    }
+
+    public int getExpectedNDV() {
+      return expectedNDV;
+    }
+
+    @Override
+    public int compareTo(BloomFilterCandidate o) {
+      return this.bloomFilter.getBitsetSize() - o.bloomFilter.getBitsetSize();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      BloomFilterCandidate that = (BloomFilterCandidate) o;
+      return expectedNDV == that.expectedNDV &&
+        Objects.equals(bloomFilter, that.bloomFilter);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(bloomFilter, expectedNDV);
+    }
+  }
+}
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
index 23668a9..02a0566 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bloomfilter/TestBlockSplitBloomFilter.java
@@ -18,6 +18,11 @@
  */
 package org.apache.parquet.column.values.bloomfilter;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -25,17 +30,15 @@
 import java.util.Random;
 import java.util.Set;
 
-import net.openhft.hashing.LongHashFunction;
 import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.parquet.io.api.Binary;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
+
+import org.apache.parquet.io.api.Binary;
+
+import net.openhft.hashing.LongHashFunction;
 
 public class TestBlockSplitBloomFilter {
 
@@ -185,6 +188,39 @@
   }
 
   @Test
+  public void testAdaptiveBloomFilter() {
+    int maxBloomFilterSize = 1024 * 1024;
+    int candidateNumber = 10;
+    AdaptiveBlockSplitBloomFilter adaptiveBloomFilter = new AdaptiveBlockSplitBloomFilter(maxBloomFilterSize,
+      candidateNumber, 0.01, null);
+
+    assertEquals(candidateNumber, adaptiveBloomFilter.getCandidates().size());
+
+    Set<String> existedValue = new HashSet<>();
+    while (existedValue.size() < 10000) {
+      String str = RandomStringUtils.randomAlphabetic(1, 64);
+      adaptiveBloomFilter.insertHash(adaptiveBloomFilter.hash(Binary.fromString(str)));
+      existedValue.add(str);
+    }
+    // removed some small bloom filter
+    assertEquals(7, adaptiveBloomFilter.getCandidates().size());
+    BlockSplitBloomFilter optimalCandidate = adaptiveBloomFilter.optimalCandidate().getBloomFilter();
+    for (String value : existedValue) {
+      assertTrue(optimalCandidate.findHash(optimalCandidate.hash(Binary.fromString(value))));
+    }
+
+    int maxCandidateNDV = adaptiveBloomFilter.getCandidates().stream()
+      .max(AdaptiveBlockSplitBloomFilter.BloomFilterCandidate::compareTo).get().getExpectedNDV();
+    while (existedValue.size() < maxCandidateNDV + 1) {
+      String str = RandomStringUtils.randomAlphabetic(1, 64);
+      adaptiveBloomFilter.insertHash(adaptiveBloomFilter.hash(Binary.fromString(str)));
+      existedValue.add(str);
+    }
+    // the number of distinct value exceeds the maximum candidate's expected NDV, so only the maximum candidate is kept
+    assertEquals(1, adaptiveBloomFilter.getCandidates().size());
+  }
+
+  @Test
   public void testMergeBloomFilter() throws IOException {
     int numBytes = BlockSplitBloomFilter.optimalNumOfBits(1024 * 5, 0.01) / 8;
     BloomFilter otherBloomFilter = new BlockSplitBloomFilter(numBytes);
diff --git a/parquet-hadoop/README.md b/parquet-hadoop/README.md
index deb82e0..c27c5f2 100644
--- a/parquet-hadoop/README.md
+++ b/parquet-hadoop/README.md
@@ -197,7 +197,7 @@
 
 **Property:** `parquet.bloom.filter.enabled`  
 **Description:** Whether to enable writing bloom filter.  
-If it is true, the bloom filter will be enable for all columns. If it is false, it will be disabled for all columns.  
+If it is true, the bloom filter will be enabled for all columns. If it is false, it will be disabled for all columns.  
 It is also possible to enable it for some columns by specifying the column name within the property followed by #.  
 **Default value:** `false`  
 **Example:**
@@ -211,6 +211,24 @@
 
 ---
 
+**Property:** `parquet.bloom.filter.adaptive.enabled`  
+**Description:** Whether to enable writing adaptive bloom filter.  
+If it is true, the bloom filter will be generated with the optimal bit size 
+according to the number of real data distinct values. If it is false, it will not take effect.
+Note that the maximum bytes of the bloom filter will not exceed `parquet.bloom.filter.max.bytes` configuration (if it is 
+set too small, the generated bloom filter will not be efficient).
+**Default value:** `false`
+
+---
+
+**Property:** `parquet.bloom.filter.candidates.number`  
+**Description:** The number of candidate bloom filters written at the same time.  
+When `parquet.bloom.filter.adaptive.enabled` is true, multiple candidate bloom filters will be inserted 
+at the same time, finally a bloom filter with the optimal bit size will be selected and written to the file.
+**Default value:** `5`
+
+---
+
 **Property:** `parquet.bloom.filter.expected.ndv`  
 **Description:** The expected number of distinct values in a column, it is used to compute the optimal size of the bloom filter.  
 Note that if this property is not set, the bloom filter will use the maximum size.  
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 9cd7f13..7ec38ee 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -892,6 +892,8 @@
       }
       if (isWriteBloomFilter) {
         currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
+      } else {
+        LOG.info("No need to write bloom filter because column {} data pages are all encoded as dictionary.", descriptor.getPath());
       }
     }
     LOG.debug("{}: write data pages", out.getPos());
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 78942b1..fe718c0 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.hadoop;
 
 import static org.apache.parquet.column.ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED;
+import static org.apache.parquet.column.ParquetProperties.DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED;
 import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
 import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
 
@@ -152,6 +153,8 @@
   public static final String BLOOM_FILTER_EXPECTED_NDV = "parquet.bloom.filter.expected.ndv";
   public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes";
   public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+  public static final String ADAPTIVE_BLOOM_FILTER_ENABLED = "parquet.bloom.filter.adaptive.enabled";
+  public static final String BLOOM_FILTER_CANDIDATES_NUMBER = "parquet.bloom.filter.candidates.number";
   public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
   public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
 
@@ -228,6 +231,10 @@
     return conf.getBoolean(BLOOM_FILTER_ENABLED, DEFAULT_BLOOM_FILTER_ENABLED);
   }
 
+  public static boolean getAdaptiveBloomFilterEnabled(Configuration conf) {
+    return conf.getBoolean(ADAPTIVE_BLOOM_FILTER_ENABLED, DEFAULT_ADAPTIVE_BLOOM_FILTER_ENABLED);
+  }
+
   public static int getBlockSize(JobContext jobContext) {
     return getBlockSize(getConfiguration(jobContext));
   }
@@ -453,6 +460,7 @@
         .withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
         .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
         .withBloomFilterEnabled(getBloomFilterEnabled(conf))
+        .withAdaptiveBloomFilterEnabled(getAdaptiveBloomFilterEnabled(conf))
         .withPageRowCountLimit(getPageRowCountLimit(conf))
         .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf));
     new ColumnConfigParser()
@@ -462,6 +470,10 @@
         .withColumnConfig(BLOOM_FILTER_EXPECTED_NDV, key -> conf.getLong(key, -1L), propsBuilder::withBloomFilterNDV)
         .withColumnConfig(BLOOM_FILTER_FPP, key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),
             propsBuilder::withBloomFilterFPP)
+        .withColumnConfig(
+          BLOOM_FILTER_CANDIDATES_NUMBER,
+          key -> conf.getInt(key, ParquetProperties.DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER),
+          propsBuilder::withBloomFilterCandidatesNumber)
         .parseConfig(conf);
 
     ParquetProperties props = propsBuilder.build();
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 8693781..7400266 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -620,6 +620,28 @@
     }
 
     /**
+     * When NDV (number of distinct values) for a specified column is not set, whether to use
+     * `AdaptiveBloomFilter` to automatically adjust the BloomFilter size according to `parquet.bloom.filter.max.bytes`
+     *
+     * @param enabled whether to write bloom filter for the column
+     */
+    public SELF withAdaptiveBloomFilterEnabled(boolean enabled) {
+      encodingPropsBuilder.withAdaptiveBloomFilterEnabled(enabled);
+      return self();
+    }
+
+    /**
+     * When `AdaptiveBloomFilter` is enabled, set how many bloom filter candidates to use.
+     *
+     * @param columnPath the path of the column (dot-string)
+     * @param number the number of candidate
+     */
+    public SELF withBloomFilterCandidateNumber(String columnPath, int number) {
+      encodingPropsBuilder.withBloomFilterCandidatesNumber(columnPath, number);
+      return self();
+    }
+
+    /**
      * Sets the bloom filter enabled/disabled
      *
      * @param enabled whether to write bloom filters
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestAdaptiveBlockSplitBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestAdaptiveBlockSplitBloomFiltering.java
new file mode 100644
index 0000000..ded4afc
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestAdaptiveBlockSplitBloomFiltering.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+public class TestAdaptiveBlockSplitBloomFiltering extends TestBloomFiltering {
+
+  @BeforeClass
+  public static void createFiles() throws IOException {
+    createFiles(true);
+  }
+
+  public TestAdaptiveBlockSplitBloomFiltering(Path file, boolean isEncrypted) {
+    super(file, isEncrypted);
+  }
+
+  @Test
+  public void testSimpleFiltering() throws IOException {
+    super.testSimpleFiltering();
+  }
+
+  @Test
+  public void testNestedFiltering() throws IOException {
+    super.testNestedFiltering();
+  }
+
+  @Test
+  public void checkBloomFilterSize() throws IOException {
+    FileDecryptionProperties fileDecryptionProperties = getFileDecryptionProperties();
+    final ParquetReadOptions readOptions = ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build();
+    InputFile inputFile = HadoopInputFile.fromPath(getFile(), new Configuration());
+    try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, readOptions)) {
+      fileReader.getRowGroups().forEach(block -> {
+        BloomFilterReader bloomFilterReader = fileReader.getBloomFilterDataReader(block);
+        block.getColumns().stream()
+          .filter(column -> column.getBloomFilterOffset() > 0)
+          .forEach(column -> {
+            int bitsetSize = bloomFilterReader.readBloomFilter(column).getBitsetSize();
+            // set 10 candidates:
+            // [byteSize=2048, expectedNVD=1500], [byteSize=4096, expectedNVD=3000], [byteSize=6500, expectedNVD=8192],
+            // [byteSize=16384, expectedNVD=13500], [byteSize=32768, expectedNVD=27000] ......
+            // number of distinct values is less than 100, so the byteSize should be less than 2048.
+            assertTrue(bitsetSize <= 2048);
+          });
+      });
+    }
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
index 42a284a..105a032 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java
@@ -19,27 +19,15 @@
 
 package org.apache.parquet.hadoop;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.crypto.ColumnEncryptionProperties;
-import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
-import org.apache.parquet.crypto.FileDecryptionProperties;
-import org.apache.parquet.crypto.FileEncryptionProperties;
-import org.apache.parquet.filter2.compat.FilterCompat;
-import org.apache.parquet.filter2.predicate.FilterPredicate;
-import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
-import org.apache.parquet.hadoop.example.ExampleParquetWriter;
-import org.apache.parquet.hadoop.example.GroupReadSupport;
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-import org.apache.parquet.io.api.Binary;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.doubleColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.filter2.predicate.FilterApi.in;
+import static org.apache.parquet.filter2.predicate.FilterApi.longColumn;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -58,9 +46,31 @@
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-import static org.apache.parquet.filter2.predicate.FilterApi.*;
-import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
-import static org.junit.Assert.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.DecryptionKeyRetrieverMock;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.api.Binary;
 
 @RunWith(Parameterized.class)
 public class TestBloomFiltering {
@@ -197,17 +207,7 @@
 
   private List<PhoneBookWriter.User> readUsers(FilterPredicate filter, boolean useOtherFiltering,
                                                boolean useBloomFilter) throws IOException {
-    FileDecryptionProperties fileDecryptionProperties = null;
-    if (isEncrypted) {
-      DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock()
-        .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY)
-        .putKey(COLUMN_ENCRYPTION_KEY1_ID, COLUMN_ENCRYPTION_KEY1)
-        .putKey(COLUMN_ENCRYPTION_KEY2_ID, COLUMN_ENCRYPTION_KEY2);
-
-      fileDecryptionProperties = FileDecryptionProperties.builder()
-        .withKeyRetriever(decryptionKeyRetrieverMock)
-        .build();
-    }
+    FileDecryptionProperties fileDecryptionProperties = getFileDecryptionProperties();
 
     return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file)
       .withFilter(FilterCompat.get(filter))
@@ -219,6 +219,20 @@
       .useColumnIndexFilter(useOtherFiltering), true);
   }
 
+  public FileDecryptionProperties getFileDecryptionProperties() {
+    if (!isEncrypted) {
+      return null;
+    }
+    DecryptionKeyRetrieverMock decryptionKeyRetrieverMock = new DecryptionKeyRetrieverMock()
+      .putKey(FOOTER_ENCRYPTION_KEY_ID, FOOTER_ENCRYPTION_KEY)
+      .putKey(COLUMN_ENCRYPTION_KEY1_ID, COLUMN_ENCRYPTION_KEY1)
+      .putKey(COLUMN_ENCRYPTION_KEY2_ID, COLUMN_ENCRYPTION_KEY2);
+
+    return FileDecryptionProperties.builder()
+      .withKeyRetriever(decryptionKeyRetrieverMock)
+      .build();
+  }
+
   // Assumes that both lists are in the same order
   private static void assertContains(Stream<PhoneBookWriter.User> expected, List<PhoneBookWriter.User> actual) {
     Iterator<PhoneBookWriter.User> expIt = expected.iterator();
@@ -280,35 +294,56 @@
     return encryptionProperties;
   }
 
-  private static void writePhoneBookToFile(Path file,
-                                           ParquetProperties.WriterVersion parquetVersion,
-                                           FileEncryptionProperties encryptionProperties) throws IOException {
+  protected static void writePhoneBookToFile(Path file,
+    ParquetProperties.WriterVersion parquetVersion,
+    FileEncryptionProperties encryptionProperties,
+    boolean useAdaptiveBloomFilter) throws IOException {
     int pageSize = DATA.size() / 100;     // Ensure that several pages will be created
     int rowGroupSize = pageSize * 4;    // Ensure that there are more row-groups created
-    PhoneBookWriter.write(ExampleParquetWriter.builder(file)
-        .withWriteMode(OVERWRITE)
-        .withRowGroupSize(rowGroupSize)
-        .withPageSize(pageSize)
+    ExampleParquetWriter.Builder writeBuilder = ExampleParquetWriter.builder(file)
+      .withWriteMode(OVERWRITE)
+      .withRowGroupSize(rowGroupSize)
+      .withPageSize(pageSize)
+      .withEncryption(encryptionProperties)
+      .withWriterVersion(parquetVersion);
+    if (useAdaptiveBloomFilter) {
+      writeBuilder
+        .withAdaptiveBloomFilterEnabled(true)
+        .withBloomFilterEnabled("location.lat", true)
+        .withBloomFilterCandidateNumber("location.lat", 10)
+        .withBloomFilterEnabled("name", true)
+        .withBloomFilterCandidateNumber("name", 10)
+        .withBloomFilterEnabled("id", true)
+        .withBloomFilterCandidateNumber("id", 10);
+    } else {
+      writeBuilder
         .withBloomFilterNDV("location.lat", 10000L)
         .withBloomFilterNDV("name", 10000L)
-        .withBloomFilterNDV("id", 10000L)
-        .withEncryption(encryptionProperties)
-        .withWriterVersion(parquetVersion),
-      DATA);
+        .withBloomFilterNDV("id", 10000L);
+    }
+    PhoneBookWriter.write(writeBuilder, DATA);
   }
 
   private static void deleteFile(Path file) throws IOException {
     file.getFileSystem(new Configuration()).delete(file, false);
   }
 
+  public Path getFile() {
+    return file;
+  }
+
   @BeforeClass
   public static void createFiles() throws IOException {
-    writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0, null);
-    writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0, null);
+    createFiles(false);
+  }
+
+  public static void createFiles(boolean useAdaptiveBloomFilter) throws IOException {
+    writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0, null, useAdaptiveBloomFilter);
+    writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0, null, useAdaptiveBloomFilter);
 
     FileEncryptionProperties encryptionProperties = getFileEncryptionProperties();
-    writePhoneBookToFile(FILE_V1_E, ParquetProperties.WriterVersion.PARQUET_1_0, encryptionProperties);
-    writePhoneBookToFile(FILE_V2_E, ParquetProperties.WriterVersion.PARQUET_2_0, encryptionProperties);
+    writePhoneBookToFile(FILE_V1_E, ParquetProperties.WriterVersion.PARQUET_1_0, encryptionProperties, useAdaptiveBloomFilter);
+    writePhoneBookToFile(FILE_V2_E, ParquetProperties.WriterVersion.PARQUET_2_0, encryptionProperties, useAdaptiveBloomFilter);
   }
 
   @AfterClass
@@ -364,4 +399,23 @@
       },
       eq(doubleColumn("location.lat"), 99.9));
   }
+
+  @Test
+  public void checkBloomFilterSize() throws IOException {
+    FileDecryptionProperties fileDecryptionProperties = getFileDecryptionProperties();
+    final ParquetReadOptions readOptions = ParquetReadOptions.builder().withDecryption(fileDecryptionProperties).build();
+    InputFile inputFile = HadoopInputFile.fromPath(getFile(), new Configuration());
+    try (ParquetFileReader fileReader = ParquetFileReader.open(inputFile, readOptions)) {
+      fileReader.getRowGroups().forEach(block -> {
+        BloomFilterReader bloomFilterReader = fileReader.getBloomFilterDataReader(block);
+        block.getColumns().stream()
+          .filter(column -> column.getBloomFilterOffset() > 0)
+          .forEach(column -> {
+            int bitsetSize = bloomFilterReader.readBloomFilter(column).getBitsetSize();
+            // when setting nvd to a fixed value 10000L, bitsetSize will always be 16384
+            assertEquals(16384, bitsetSize);
+          });
+      });
+    }
+  }
 }