[FLINK-20496][state backends] Introduce RocksDB metadata block size setting.
diff --git a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
index 29142bb..a55c319 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
@@ -21,6 +21,12 @@
             <td>The amount of the cache for data blocks in RocksDB. RocksDB has default block-cache size as '8MB'.</td>
         </tr>
         <tr>
+            <td><h5>state.backend.rocksdb.block.metadata-blocksize</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>MemorySize</td>
+            <td>Approximate size of partitioned metadata packed per block. Currently applied to indexes block when partitioned index/filters option is enabled. RocksDB has default metadata blocksize as '4KB'.</td>
+        </tr>
+        <tr>
             <td><h5>state.backend.rocksdb.compaction.level.max-size-level-base</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>MemorySize</td>
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
index 94ae969..f5dd310 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -47,6 +47,7 @@
 import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_OPEN_FILES;
 import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE;
 import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER;
+import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.METADATA_BLOCK_SIZE;
 import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE;
 import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE;
 import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE;
@@ -131,6 +132,10 @@
             blockBasedTableConfig.setBlockSize(getBlockSize());
         }
 
+        if (isOptionConfigured(METADATA_BLOCK_SIZE)) {
+            blockBasedTableConfig.setMetadataBlockSize(getMetadataBlockSize());
+        }
+
         if (isOptionConfigured(BLOCK_CACHE_SIZE)) {
             blockBasedTableConfig.setBlockCacheSize(getBlockCacheSize());
         }
@@ -316,6 +321,23 @@
     }
 
     // --------------------------------------------------------------------------
+    // Approximate size of partitioned metadata packed per block.
+    // Currently applied to indexes block when partitioned index/filters option is enabled.
+    // --------------------------------------------------------------------------
+
+    private long getMetadataBlockSize() {
+        return MemorySize.parseBytes(getInternal(METADATA_BLOCK_SIZE.key()));
+    }
+
+    public DefaultConfigurableOptionsFactory setMetadataBlockSize(String metadataBlockSize) {
+        Preconditions.checkArgument(
+                MemorySize.parseBytes(metadataBlockSize) > 0,
+                "Invalid configuration " + metadataBlockSize + " for metadata block size.");
+        setInternal(METADATA_BLOCK_SIZE.key(), metadataBlockSize);
+        return this;
+    }
+
+    // --------------------------------------------------------------------------
     // The amount of the cache for data blocks in RocksDB
     // --------------------------------------------------------------------------
 
@@ -348,6 +370,7 @@
                 MAX_WRITE_BUFFER_NUMBER,
                 MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
                 BLOCK_SIZE,
+                METADATA_BLOCK_SIZE,
                 BLOCK_CACHE_SIZE
             };
 
@@ -365,6 +388,7 @@
                             MAX_SIZE_LEVEL_BASE,
                             WRITE_BUFFER_SIZE,
                             BLOCK_SIZE,
+                            METADATA_BLOCK_SIZE,
                             BLOCK_CACHE_SIZE));
 
     /**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index d6221e6..d901522 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -174,6 +174,15 @@
                             "The approximate size (in bytes) of user data packed per block. "
                                     + "RocksDB has default blocksize as '4KB'.");
 
+    public static final ConfigOption<MemorySize> METADATA_BLOCK_SIZE =
+            key("state.backend.rocksdb.block.metadata-blocksize")
+                    .memoryType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Approximate size of partitioned metadata packed per block. "
+                                    + "Currently applied to indexes block when partitioned index/filters option is enabled. "
+                                    + "RocksDB has default metadata blocksize as '4KB'.");
+
     public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZE =
             key("state.backend.rocksdb.block.cache-size")
                     .memoryType()
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index ebe4966..a61079b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -477,6 +477,7 @@
                         .setMaxWriteBufferNumber(4)
                         .setMinWriteBufferNumberToMerge(3)
                         .setBlockSize("64KB")
+                        .setMetadataBlockSize("16KB")
                         .setBlockCacheSize("512mb");
 
         try (RocksDBResourceContainer optionsContainer =
@@ -497,6 +498,7 @@
             BlockBasedTableConfig tableConfig =
                     (BlockBasedTableConfig) columnOptions.tableFormatConfig();
             assertEquals(64 * SizeUnit.KB, tableConfig.blockSize());
+            assertEquals(16 * SizeUnit.KB, tableConfig.metadataBlockSize());
             assertEquals(512 * SizeUnit.MB, tableConfig.blockCacheSize());
         }
     }
@@ -520,6 +522,7 @@
             verifyIllegalArgument(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, "1BB");
             verifyIllegalArgument(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "-1KB");
             verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_SIZE, "0MB");
+            verifyIllegalArgument(RocksDBConfigurableOptions.METADATA_BLOCK_SIZE, "0MB");
             verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "0");
 
             verifyIllegalArgument(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, "1");
@@ -541,6 +544,7 @@
                     RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(), "2");
             configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE.key(), "64 MB");
             configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE.key(), "4 kb");
+            configuration.setString(RocksDBConfigurableOptions.METADATA_BLOCK_SIZE.key(), "8 kb");
             configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE.key(), "512 mb");
 
             DefaultConfigurableOptionsFactory optionsFactory =
@@ -566,6 +570,7 @@
                 BlockBasedTableConfig tableConfig =
                         (BlockBasedTableConfig) columnOptions.tableFormatConfig();
                 assertEquals(4 * SizeUnit.KB, tableConfig.blockSize());
+                assertEquals(8 * SizeUnit.KB, tableConfig.metadataBlockSize());
                 assertEquals(512 * SizeUnit.MB, tableConfig.blockCacheSize());
             }
         }