[FLINK-20496][state backends] RocksDB partitioned index/filters option.

Configure partitioned index and filters options according to 'https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters'.

This closes #14341.
diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
index 79b769d..74b1236 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html
@@ -39,6 +39,12 @@
             <td>If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped.</td>
         </tr>
         <tr>
+            <td><h5>state.backend.rocksdb.memory.partitioned-index-filters</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. The partitioned index/filter then uses the top-level index to load on demand into the block cache the partitions that are required to perform the index/filter query. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.</td>
+        </tr>
+        <tr>
             <td><h5>state.backend.rocksdb.memory.write-buffer-ratio</h5></td>
             <td style="word-wrap: break-word;">0.5</td>
             <td>Double</td>
diff --git a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
index 8a74eee..a8ed2c7 100644
--- a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
+++ b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html
@@ -27,6 +27,12 @@
             <td>If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped.</td>
         </tr>
         <tr>
+            <td><h5>state.backend.rocksdb.memory.partitioned-index-filters</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. The partitioned index/filter then uses the top-level index to load on demand into the block cache the partitions that are required to perform the index/filter query. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.</td>
+        </tr>
+        <tr>
             <td><h5>state.backend.rocksdb.memory.write-buffer-ratio</h5></td>
             <td style="word-wrap: break-word;">0.5</td>
             <td>Double</td>
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java
index dda5409..3bfcc03 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java
@@ -51,6 +51,9 @@
      */
     @Nullable private Double highPriorityPoolRatio;
 
+    /** Flag whether to use partition index/filters. Null if not set. */
+    @Nullable private Boolean usePartitionedIndexFilters;
+
     // ------------------------------------------------------------------------
 
     /**
@@ -166,6 +169,17 @@
                 : RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue();
     }
 
+    /**
+     * Gets whether the state backend is configured to use partitioned index/filters for RocksDB.
+     *
+     * <p>See {@link RocksDBOptions#USE_PARTITIONED_INDEX_FILTERS} for details.
+     */
+    public Boolean isUsingPartitionedIndexFilters() {
+        return usePartitionedIndexFilters != null
+                ? usePartitionedIndexFilters
+                : RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS.defaultValue();
+    }
+
     // ------------------------------------------------------------------------
 
     /** Validates if the configured options are valid with respect to one another. */
@@ -219,6 +233,11 @@
                         ? other.highPriorityPoolRatio
                         : config.get(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO);
 
+        newConfig.usePartitionedIndexFilters =
+                other.usePartitionedIndexFilters != null
+                        ? other.usePartitionedIndexFilters
+                        : config.get(RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS);
+
         return newConfig;
     }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
index 7362f1a..78aa1a5 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java
@@ -39,7 +39,10 @@
      * @return memory controllable RocksDB shared resources.
      */
     public static RocksDBSharedResources allocateRocksDBSharedResources(
-            long totalMemorySize, double writeBufferRatio, double highPriorityPoolRatio) {
+            long totalMemorySize,
+            double writeBufferRatio,
+            double highPriorityPoolRatio,
+            boolean usingPartitionedIndexFilters) {
         long calculatedCacheCapacity =
                 RocksDBMemoryControllerUtils.calculateActualCacheCapacity(
                         totalMemorySize, writeBufferRatio);
@@ -54,7 +57,8 @@
                 RocksDBMemoryControllerUtils.createWriteBufferManager(
                         writeBufferManagerCapacity, cache);
 
-        return new RocksDBSharedResources(cache, wbm, writeBufferManagerCapacity);
+        return new RocksDBSharedResources(
+                cache, wbm, writeBufferManagerCapacity, usingPartitionedIndexFilters);
     }
 
     /**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
index acb99e4..e14d83b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java
@@ -270,11 +270,15 @@
 
         final double highPriorityPoolRatio = memoryConfig.getHighPriorityPoolRatio();
         final double writeBufferRatio = memoryConfig.getWriteBufferRatio();
+        final boolean usingPartitionedIndexFilters = memoryConfig.isUsingPartitionedIndexFilters();
 
         final LongFunctionWithException<RocksDBSharedResources, Exception> allocator =
                 (size) ->
                         RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(
-                                size, writeBufferRatio, highPriorityPoolRatio);
+                                size,
+                                writeBufferRatio,
+                                highPriorityPoolRatio,
+                                usingPartitionedIndexFilters);
 
         try {
             if (memoryConfig.isUsingFixedMemoryPerSlot()) {
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
index 5e8def4..369e944 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java
@@ -135,4 +135,18 @@
                                     "The fraction of cache memory that is reserved for high-priority data like index, filter, and "
                                             + "compression dictionary blocks. This option only has an effect when '%s' or '%s' are configured.",
                                     USE_MANAGED_MEMORY.key(), FIX_PER_SLOT_MEMORY_SIZE.key()));
+
+    @Documentation.Section(Documentation.Sections.STATE_BACKEND_ROCKSDB)
+    public static final ConfigOption<Boolean> USE_PARTITIONED_INDEX_FILTERS =
+            ConfigOptions.key("state.backend.rocksdb.memory.partitioned-index-filters")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            String.format(
+                                    "With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with "
+                                            + "an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. "
+                                            + "The partitioned index/filter then uses the top-level index to load on demand into the block cache "
+                                            + "the partitions that are required to perform the index/filter query. "
+                                            + "This option only has an effect when '%s' or '%s' are configured.",
+                                    USE_MANAGED_MEMORY.key(), FIX_PER_SLOT_MEMORY_SIZE.key()));
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 702141e..542bfd3 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -18,20 +18,27 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.memory.OpaqueMemoryResource;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
 import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.Filter;
+import org.rocksdb.IndexType;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.TableFormatConfig;
 import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -44,6 +51,7 @@
  * options, and should be properly (and necessarily) closed to prevent resource leak.
  */
 public final class RocksDBResourceContainer implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(RocksDBResourceContainer.class);
 
     /** The pre-configured option settings. */
     private final PredefinedOptions predefinedOptions;
@@ -143,6 +151,12 @@
                         "We currently only support BlockBasedTableConfig When bounding total memory.");
                 blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig;
             }
+            if (rocksResources.isUsingPartitionedIndexFilters()
+                    && overwriteFilterIfExist(blockBasedTableConfig)) {
+                blockBasedTableConfig.setIndexType(IndexType.kTwoLevelIndexSearch);
+                blockBasedTableConfig.setPartitionFilters(true);
+                blockBasedTableConfig.setPinTopLevelIndexAndFilter(true);
+            }
             blockBasedTableConfig.setBlockCache(blockCache);
             blockBasedTableConfig.setCacheIndexAndFilterBlocks(true);
             blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
@@ -207,4 +221,39 @@
             sharedResources.close();
         }
     }
+
+    /**
+     * Overwrite configured {@link Filter} if enable partitioned filter. Partitioned filter only
+     * worked in full bloom filter, not blocked based.
+     */
+    private boolean overwriteFilterIfExist(BlockBasedTableConfig blockBasedTableConfig) {
+        Filter filter = null;
+        try {
+            filter = getFilterFromBlockBasedTableConfig(blockBasedTableConfig);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            LOG.warn(
+                    "Reflection exception occurred when getting filter from BlockBasedTableConfig, disable partition index filters!");
+            return false;
+        }
+        if (filter != null) {
+            // TODO Can get filter's config in the future RocksDB version, and build new filter use
+            // existing config.
+            BloomFilter newFilter = new BloomFilter(10, false);
+            LOG.info(
+                    "Existing filter has been overwritten to full filters since partitioned index filters is enabled.");
+            blockBasedTableConfig.setFilter(newFilter);
+            handlesToClose.add(newFilter);
+        }
+        return true;
+    }
+
+    @VisibleForTesting
+    static Filter getFilterFromBlockBasedTableConfig(BlockBasedTableConfig blockBasedTableConfig)
+            throws NoSuchFieldException, IllegalAccessException {
+        Field filterField = blockBasedTableConfig.getClass().getDeclaredField("filter_");
+        filterField.setAccessible(true);
+        Object filter = filterField.get(blockBasedTableConfig);
+        filterField.setAccessible(false);
+        return filter == null ? null : (Filter) filter;
+    }
 }
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java
index 144e6f3..f48f4eb 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java
@@ -32,11 +32,17 @@
     private final WriteBufferManager writeBufferManager;
     private final long writeBufferManagerCapacity;
 
+    private final boolean usingPartitionedIndexFilters;
+
     RocksDBSharedResources(
-            Cache cache, WriteBufferManager writeBufferManager, long writeBufferManagerCapacity) {
+            Cache cache,
+            WriteBufferManager writeBufferManager,
+            long writeBufferManagerCapacity,
+            boolean usingPartitionedIndexFilters) {
         this.cache = cache;
         this.writeBufferManager = writeBufferManager;
         this.writeBufferManagerCapacity = writeBufferManagerCapacity;
+        this.usingPartitionedIndexFilters = usingPartitionedIndexFilters;
     }
 
     public Cache getCache() {
@@ -51,6 +57,10 @@
         return writeBufferManagerCapacity;
     }
 
+    public boolean isUsingPartitionedIndexFilters() {
+        return usingPartitionedIndexFilters;
+    }
+
     @Override
     public void close() {
         writeBufferManager.close();
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java
index ce59b4b..12b33e1 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java
@@ -40,6 +40,7 @@
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyDouble;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.powermock.api.mockito.PowerMockito.when;
@@ -64,7 +65,7 @@
         final AtomicLong actualWbmCapacity = new AtomicLong(0L);
 
         when(RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(
-                        anyLong(), anyDouble(), anyDouble()))
+                        anyLong(), anyDouble(), anyDouble(), anyBoolean()))
                 .thenCallRealMethod();
 
         when(RocksDBMemoryControllerUtils.calculateActualCacheCapacity(anyLong(), anyDouble()))
@@ -99,7 +100,7 @@
         double highPriPoolRatio = 0.1;
         RocksDBSharedResources rocksDBSharedResources =
                 RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(
-                        totalMemorySize, writeBufferRatio, highPriPoolRatio);
+                        totalMemorySize, writeBufferRatio, highPriPoolRatio, false);
         long expectedCacheCapacity =
                 RocksDBMemoryControllerUtils.calculateActualCacheCapacity(
                         totalMemorySize, writeBufferRatio);
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
index 93f56bb..ad8eb8a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java
@@ -26,22 +26,29 @@
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.BloomFilter;
 import org.rocksdb.Cache;
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
+import org.rocksdb.IndexType;
 import org.rocksdb.LRUCache;
 import org.rocksdb.NativeLibraryLoader;
 import org.rocksdb.ReadOptions;
+import org.rocksdb.TableFormatConfig;
 import org.rocksdb.WriteBufferManager;
 import org.rocksdb.WriteOptions;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 
+import static org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getFilterFromBlockBasedTableConfig;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 /** Tests to guard {@link RocksDBResourceContainer}. */
@@ -152,7 +159,7 @@
         final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1);
         final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, cache);
         RocksDBSharedResources rocksDBSharedResources =
-                new RocksDBSharedResources(cache, wbm, writeBufferSize);
+                new RocksDBSharedResources(cache, wbm, writeBufferSize, false);
         return new OpaqueMemoryResource<>(
                 rocksDBSharedResources, cacheSize, rocksDBSharedResources::close);
     }
@@ -240,7 +247,8 @@
     public void testFreeSharedResourcesAfterClose() throws Exception {
         LRUCache cache = new LRUCache(1024L);
         WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
-        RocksDBSharedResources sharedResources = new RocksDBSharedResources(cache, wbm, 1024L);
+        RocksDBSharedResources sharedResources =
+                new RocksDBSharedResources(cache, wbm, 1024L, false);
         final ThrowingRunnable<Exception> disposer = sharedResources::close;
         OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
                 new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
@@ -264,4 +272,54 @@
         assertThat(writeOptions.isOwningHandle(), is(false));
         assertThat(readOptions.isOwningHandle(), is(false));
     }
+
+    @Test
+    public void testGetColumnFamilyOptionsWithPartitionedIndex() throws Exception {
+        LRUCache cache = new LRUCache(1024L);
+        WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
+        RocksDBSharedResources sharedResources =
+                new RocksDBSharedResources(cache, wbm, 1024L, true);
+        final ThrowingRunnable<Exception> disposer = sharedResources::close;
+        OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
+                new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
+        BloomFilter blockBasedFilter = new BloomFilter();
+        RocksDBOptionsFactory blockBasedBloomFilterOptionFactory =
+                new RocksDBOptionsFactory() {
+
+                    @Override
+                    public DBOptions createDBOptions(
+                            DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+                        return currentOptions;
+                    }
+
+                    @Override
+                    public ColumnFamilyOptions createColumnOptions(
+                            ColumnFamilyOptions currentOptions,
+                            Collection<AutoCloseable> handlesToClose) {
+                        TableFormatConfig tableFormatConfig = currentOptions.tableFormatConfig();
+                        BlockBasedTableConfig blockBasedTableConfig =
+                                tableFormatConfig == null
+                                        ? new BlockBasedTableConfig()
+                                        : (BlockBasedTableConfig) tableFormatConfig;
+                        blockBasedTableConfig.setFilter(blockBasedFilter);
+                        handlesToClose.add(blockBasedFilter);
+                        currentOptions.setTableFormatConfig(blockBasedTableConfig);
+                        return currentOptions;
+                    }
+                };
+        try (RocksDBResourceContainer container =
+                new RocksDBResourceContainer(
+                        PredefinedOptions.DEFAULT,
+                        blockBasedBloomFilterOptionFactory,
+                        opaqueResource)) {
+            ColumnFamilyOptions columnOptions = container.getColumnOptions();
+            BlockBasedTableConfig actual =
+                    (BlockBasedTableConfig) columnOptions.tableFormatConfig();
+            assertThat(actual.indexType(), is(IndexType.kTwoLevelIndexSearch));
+            assertThat(actual.partitionFilters(), is(true));
+            assertThat(actual.pinTopLevelIndexAndFilter(), is(true));
+            assertThat(getFilterFromBlockBasedTableConfig(actual), not(blockBasedFilter));
+        }
+        assertFalse("Block based filter is left unclosed.", blockBasedFilter.isOwningHandle());
+    }
 }