[FLINK-19840] Do not allow rocksdb timerservice that uses the heap
This commit adds a validation that does not allows setting
state.backend.rocksdb.timer-service.factory to anything else but rocksdb.
This closes #169.
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
index c4f658c..8b464d9 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/StatefulFunctionsConfigValidator.java
@@ -24,6 +24,8 @@
import java.util.List;
import java.util.Locale;
import java.util.Set;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.statefun.flink.core.exceptions.StatefulFunctionsInvalidConfigException;
@@ -40,6 +42,7 @@
static void validate(Configuration configuration) {
validateParentFirstClassloaderPatterns(configuration);
+ validateNoHeapBackedTimers(configuration);
}
private static void validateParentFirstClassloaderPatterns(Configuration configuration) {
@@ -61,4 +64,18 @@
}
return parentFirstClassloaderPatterns;
}
+
+ private static final ConfigOption<String> TIMER_SERVICE_FACTORY =
+ ConfigOptions.key("state.backend.rocksdb.timer-service.factory")
+ .stringType()
+ .defaultValue("rocksdb");
+
+ private static void validateNoHeapBackedTimers(Configuration configuration) {
+ final String timerFactory = configuration.getString(TIMER_SERVICE_FACTORY);
+ if (!timerFactory.equalsIgnoreCase("rocksdb")) {
+ throw new StatefulFunctionsInvalidConfigException(
+ TIMER_SERVICE_FACTORY,
+ "StateFun only supports non-heap timers with a rocksdb state backend.");
+ }
+ }
}