[FLINK-5352] [rocksdb] Restore 1.1.3 RocksDB memory footprint
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index 93aac85..af85fa7 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -35,14 +35,13 @@
public enum PredefinedOptions {
/**
- * Default options for all settings, except that writes are not forced to the
- * disk.
+ * Default options for all settings, except that writes are not forced to the disk.
*
* <p>Note: Because Flink does not rely on RocksDB data on disk for recovery,
* there is no need to sync data to stable storage.
*/
DEFAULT {
-
+
@Override
public DBOptions createDBOptions() {
return new DBOptions()
@@ -55,7 +54,34 @@
return new ColumnFamilyOptions()
.setMergeOperator(new StringAppendOperator());
}
+ },
+ /**
+ * Default options as defined by RocksDB version 4.5.1. This options are present to allow
+ * reproducing the memory usage behavior from Flink versions 1.0.x and 1.1.x.
+ *
+ * @deprecated These options are only used to make Flink 1.1.4 (updated RocksDB dependency due
+ * to critical bug fixes in RocksDB) behave similar to Flink 1.1.3 and prior versions.
+ * This option will not be present in Flink 1.2.0 and upwards.
+ */
+ @Deprecated
+ DEFAULT_ROCKS_4_5_1 {
+
+ @Override
+ public DBOptions createDBOptions() {
+ return new DBOptions()
+ .setUseFsync(false)
+ .setDisableDataSync(true);
+ }
+
+ @Override
+ public ColumnFamilyOptions createColumnOptions() {
+ return new ColumnFamilyOptions()
+ .setMergeOperator(new StringAppendOperator())
+ .setWriteBufferSize(4194304)
+ .setTargetFileSizeBase(2097152)
+ .setMaxBytesForLevelBase(10485760);
+ }
},
/**
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 3d75bde..7c8b7b6 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -132,8 +132,10 @@
// RocksDB options
- /** The pre-configured option settings */
- private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;
+ /** The pre-configured option settings - currently set to use the old RocksDB settings for
+ * backwards compatible memory footprints */
+ @SuppressWarnings("deprecated")
+ private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT_ROCKS_4_5_1;
/** The options factory to create the RocksDB options in the cluster */
private OptionsFactory optionsFactory;
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index fca5773..5cc7182 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -197,15 +197,15 @@
@Test
public void testPredefinedOptions() throws Exception {
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-
- assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
-
+
+ assertEquals(PredefinedOptions.DEFAULT_ROCKS_4_5_1, rocksDbBackend.getPredefinedOptions());
+
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
DBOptions opt1 = rocksDbBackend.getDbOptions();
DBOptions opt2 = rocksDbBackend.getDbOptions();
-
+
assertEquals(opt1, opt2);
ColumnFamilyOptions columnOpt1 = rocksDbBackend.getColumnOptions();
@@ -240,7 +240,7 @@
public void testPredefinedAndOptionsFactory() throws Exception {
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
- assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
+ assertEquals(PredefinedOptions.DEFAULT_ROCKS_4_5_1, rocksDbBackend.getPredefinedOptions());
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
rocksDbBackend.setOptions(new OptionsFactory() {