[FLINK-14926][state-backend-rocksdb] (follow-up) Simplify collecton the option objects to close
- rather than letting each option factory method add the option to the list itself, add it in one place in the Resource Container.
- assume the list passed to the factory methods is always non-null
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
index a629756..47c7d12 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/PredefinedOptions.java
@@ -25,6 +25,7 @@
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
+import java.util.ArrayList;
import java.util.Collection;
/**
@@ -58,24 +59,15 @@
@Override
public DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose) {
- DBOptions dbOptions =
- new DBOptions()
+ return new DBOptions()
.setUseFsync(false)
.setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
.setStatsDumpPeriodSec(0);
- if (handlesToClose != null) {
- handlesToClose.add(dbOptions);
- }
- return dbOptions;
}
@Override
public ColumnFamilyOptions createColumnOptions(Collection<AutoCloseable> handlesToClose) {
- ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
- if (handlesToClose != null) {
- handlesToClose.add(columnFamilyOptions);
- }
- return columnFamilyOptions;
+ return new ColumnFamilyOptions();
}
},
@@ -106,29 +98,19 @@
@Override
public DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose) {
- DBOptions dbOptions =
- new DBOptions()
+ return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1)
.setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
.setStatsDumpPeriodSec(0);
- if (handlesToClose != null) {
- handlesToClose.add(dbOptions);
- }
- return dbOptions;
}
@Override
public ColumnFamilyOptions createColumnOptions(Collection<AutoCloseable> handlesToClose) {
- ColumnFamilyOptions columnFamilyOptions =
- new ColumnFamilyOptions()
+ return new ColumnFamilyOptions()
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true);
- if (handlesToClose != null) {
- handlesToClose.add(columnFamilyOptions);
- }
- return columnFamilyOptions;
}
},
@@ -164,18 +146,12 @@
@Override
public DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose) {
-
- DBOptions dbOptions =
- new DBOptions()
+ return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1)
.setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
.setStatsDumpPeriodSec(0);
- if (handlesToClose != null) {
- handlesToClose.add(dbOptions);
- }
- return dbOptions;
}
@Override
@@ -187,8 +163,9 @@
final long writeBufferSize = 64 * 1024 * 1024;
BloomFilter bloomFilter = new BloomFilter();
- ColumnFamilyOptions columnFamilyOptions =
- new ColumnFamilyOptions()
+ handlesToClose.add(bloomFilter);
+
+ return new ColumnFamilyOptions()
.setCompactionStyle(CompactionStyle.LEVEL)
.setLevelCompactionDynamicLevelBytes(true)
.setTargetFileSizeBase(targetFileSize)
@@ -202,11 +179,6 @@
.setBlockSize(blockSize)
.setFilter(bloomFilter)
);
- if (handlesToClose != null) {
- handlesToClose.add(bloomFilter);
- handlesToClose.add(columnFamilyOptions);
- }
- return columnFamilyOptions;
}
},
@@ -233,26 +205,17 @@
@Override
public DBOptions createDBOptions(Collection<AutoCloseable> handlesToClose) {
- DBOptions dbOptions =
- new DBOptions()
+ return new DBOptions()
.setIncreaseParallelism(4)
.setUseFsync(false)
.setMaxOpenFiles(-1)
.setInfoLogLevel(InfoLogLevel.HEADER_LEVEL)
.setStatsDumpPeriodSec(0);
- if (handlesToClose != null) {
- handlesToClose.add(dbOptions);
- }
- return dbOptions;
}
@Override
public ColumnFamilyOptions createColumnOptions(Collection<AutoCloseable> handlesToClose) {
- ColumnFamilyOptions columnFamilyOptions = new ColumnFamilyOptions();
- if (handlesToClose != null) {
- handlesToClose.add(columnFamilyOptions);
- }
- return columnFamilyOptions;
+ return new ColumnFamilyOptions();
}
};
@@ -271,7 +234,7 @@
* @deprecated use {@link #createColumnOptions(Collection)} instead.
*/
public DBOptions createDBOptions() {
- return createDBOptions(null);
+ return createDBOptions(new ArrayList<>());
}
/**
@@ -287,7 +250,7 @@
* @deprecated use {@link #createColumnOptions(Collection)} instead.
*/
public ColumnFamilyOptions createColumnOptions() {
- return createColumnOptions(null);
+ return createColumnOptions(new ArrayList<>());
}
}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index 199cd84..fe2574a 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -79,6 +79,7 @@
DBOptions getDbOptions() {
// initial options from pre-defined profile
DBOptions opt = predefinedOptions.createDBOptions(handlesToClose);
+ handlesToClose.add(opt);
// add user-defined options factory, if specified
if (optionsFactory != null) {
@@ -97,6 +98,7 @@
ColumnFamilyOptions getColumnOptions() {
// initial options from pre-defined profile
ColumnFamilyOptions opt = predefinedOptions.createColumnOptions(handlesToClose);
+ handlesToClose.add(opt);
// add user-defined options, if specified
if (optionsFactory != null) {