[FLINK-20496][state backends] Introduce RocksDB metadata block size setting.
diff --git a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
index 29142bb..a55c319 100644
--- a/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
+++ b/docs/layouts/shortcodes/generated/rocksdb_configurable_configuration.html
@@ -21,6 +21,12 @@
<td>The amount of the cache for data blocks in RocksDB. RocksDB has default block-cache size as '8MB'.</td>
</tr>
<tr>
+ <td><h5>state.backend.rocksdb.block.metadata-blocksize</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>MemorySize</td>
+ <td>Approximate size of partitioned metadata packed per block. Currently applied to indexes block when partitioned index/filters option is enabled. RocksDB has default metadata blocksize as '4KB'.</td>
+ </tr>
+ <tr>
<td><h5>state.backend.rocksdb.compaction.level.max-size-level-base</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>MemorySize</td>
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
index 94ae969..f5dd310 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -47,6 +47,7 @@
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_OPEN_FILES;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER;
+import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.METADATA_BLOCK_SIZE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE;
@@ -131,6 +132,10 @@
blockBasedTableConfig.setBlockSize(getBlockSize());
}
+ if (isOptionConfigured(METADATA_BLOCK_SIZE)) {
+ blockBasedTableConfig.setMetadataBlockSize(getMetadataBlockSize());
+ }
+
if (isOptionConfigured(BLOCK_CACHE_SIZE)) {
blockBasedTableConfig.setBlockCacheSize(getBlockCacheSize());
}
@@ -316,6 +321,23 @@
}
// --------------------------------------------------------------------------
+ // Approximate size of partitioned metadata packed per block.
+ // Currently applied to indexes block when partitioned index/filters option is enabled.
+ // --------------------------------------------------------------------------
+
+ private long getMetadataBlockSize() {
+ return MemorySize.parseBytes(getInternal(METADATA_BLOCK_SIZE.key()));
+ }
+
+ public DefaultConfigurableOptionsFactory setMetadataBlockSize(String metadataBlockSize) {
+ Preconditions.checkArgument(
+ MemorySize.parseBytes(metadataBlockSize) > 0,
+ "Invalid configuration " + metadataBlockSize + " for metadata block size.");
+ setInternal(METADATA_BLOCK_SIZE.key(), metadataBlockSize);
+ return this;
+ }
+
+ // --------------------------------------------------------------------------
// The amount of the cache for data blocks in RocksDB
// --------------------------------------------------------------------------
@@ -348,6 +370,7 @@
MAX_WRITE_BUFFER_NUMBER,
MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
BLOCK_SIZE,
+ METADATA_BLOCK_SIZE,
BLOCK_CACHE_SIZE
};
@@ -365,6 +388,7 @@
MAX_SIZE_LEVEL_BASE,
WRITE_BUFFER_SIZE,
BLOCK_SIZE,
+ METADATA_BLOCK_SIZE,
BLOCK_CACHE_SIZE));
/**
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index d6221e6..d901522 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -174,6 +174,15 @@
"The approximate size (in bytes) of user data packed per block. "
+ "RocksDB has default blocksize as '4KB'.");
+ public static final ConfigOption<MemorySize> METADATA_BLOCK_SIZE =
+ key("state.backend.rocksdb.block.metadata-blocksize")
+ .memoryType()
+ .noDefaultValue()
+ .withDescription(
+ "Approximate size of partitioned metadata packed per block. "
+ + "Currently applied to indexes block when partitioned index/filters option is enabled. "
+ + "RocksDB has default metadata blocksize as '4KB'.");
+
public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZE =
key("state.backend.rocksdb.block.cache-size")
.memoryType()
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index ebe4966..a61079b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -477,6 +477,7 @@
.setMaxWriteBufferNumber(4)
.setMinWriteBufferNumberToMerge(3)
.setBlockSize("64KB")
+ .setMetadataBlockSize("16KB")
.setBlockCacheSize("512mb");
try (RocksDBResourceContainer optionsContainer =
@@ -497,6 +498,7 @@
BlockBasedTableConfig tableConfig =
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
assertEquals(64 * SizeUnit.KB, tableConfig.blockSize());
+ assertEquals(16 * SizeUnit.KB, tableConfig.metadataBlockSize());
assertEquals(512 * SizeUnit.MB, tableConfig.blockCacheSize());
}
}
@@ -520,6 +522,7 @@
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, "1BB");
verifyIllegalArgument(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "-1KB");
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_SIZE, "0MB");
+ verifyIllegalArgument(RocksDBConfigurableOptions.METADATA_BLOCK_SIZE, "0MB");
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "0");
verifyIllegalArgument(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, "1");
@@ -541,6 +544,7 @@
RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(), "2");
configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE.key(), "64 MB");
configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE.key(), "4 kb");
+ configuration.setString(RocksDBConfigurableOptions.METADATA_BLOCK_SIZE.key(), "8 kb");
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE.key(), "512 mb");
DefaultConfigurableOptionsFactory optionsFactory =
@@ -566,6 +570,7 @@
BlockBasedTableConfig tableConfig =
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
assertEquals(4 * SizeUnit.KB, tableConfig.blockSize());
+ assertEquals(8 * SizeUnit.KB, tableConfig.metadataBlockSize());
assertEquals(512 * SizeUnit.MB, tableConfig.blockCacheSize());
}
}