[FLINK-14926][state-backend-rocksdb] (follow-up) Replace OptionsFactory for RocksDBOptionsFactory

OptionsFactory was breaking existing implementations by adding a method to the interface without default method.
To solve this, we keep OptionsFactory but replace it in the RocksDB State Backend with the RocksDBOptionsFactory which has
the evolved signature. The OptionsFactory is still accepted and wrapped into a RocksDBOptionsFactory, for compatibility.
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
index 80d1a1f..343eda8 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableOptionsFactory.java
@@ -21,7 +21,7 @@
 import org.apache.flink.configuration.Configuration;
 
 /**
- * An interface for options factory that pick up additional parameters from a configuration.
+ * @deprecated Replaced by {@link ConfigurableRocksDBOptionsFactory}.
  */
 public interface ConfigurableOptionsFactory extends OptionsFactory {
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableRocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableRocksDBOptionsFactory.java
new file mode 100644
index 0000000..ac7b882
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/ConfigurableRocksDBOptionsFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * An interface for options factory that pick up additional parameters from a configuration.
+ */
+public interface ConfigurableRocksDBOptionsFactory extends RocksDBOptionsFactory {
+
+	/**
+	 * Creates a variant of the options factory that applies additional configuration parameters.
+	 *
+	 * <p>If no configuration is applied, or if the method directly applies configuration values to
+	 * the (mutable) options factory object, this method may return the original options factory object.
+	 * Otherwise it typically returns a modified copy.
+	 *
+	 * @param configuration The configuration to pick the values from.
+	 * @return A reconfigured options factory.
+	 */
+	RocksDBOptionsFactory configure(Configuration configuration);
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
index 93753b2..aaa9e83 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/DefaultConfigurableOptionsFactory.java
@@ -51,10 +51,10 @@
 import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.WRITE_BUFFER_SIZE;
 
 /**
- * An implementation of {@link ConfigurableOptionsFactory} using options provided by {@link RocksDBConfigurableOptions}.
- * It acts as the default options factory within {@link RocksDBStateBackend} if the user did not define a {@link OptionsFactory}.
+ * An implementation of {@link ConfigurableRocksDBOptionsFactory} using options provided by {@link RocksDBConfigurableOptions}.
+ * It acts as the default options factory within {@link RocksDBStateBackend} if the user did not define a {@link RocksDBOptionsFactory}.
  */
-public class DefaultConfigurableOptionsFactory implements ConfigurableOptionsFactory {
+public class DefaultConfigurableOptionsFactory implements ConfigurableRocksDBOptionsFactory {
 
 	private static final long serialVersionUID = 1L;
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
index 79815b9..9118351 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/OptionsFactory.java
@@ -21,28 +21,11 @@
 import org.rocksdb.ColumnFamilyOptions;
 import org.rocksdb.DBOptions;
 
-import java.util.Collection;
-
 /**
- * A factory for {@link DBOptions} and {@link ColumnFamilyOptions} to be passed to the {@link RocksDBStateBackend}.
- * Options have to be created lazily by this factory, because the {@code Options}
- * class is not serializable and holds pointers to native code.
- *
- * <p>A typical pattern to use this OptionsFactory is as follows:
- *
- * <pre>{@code
- * rocksDbBackend.setOptions(new OptionsFactory(){
- *
- *		public DBOptions createDBOptions(DBOptions currentOptions) {
- *			return  currentOptions.setMaxOpenFiles(1024);
- *		}
- *
- *		public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
- *			return currentOptions.setCompactionStyle(CompactionStyle.LEVEL);
- *		}
- * });
- * }</pre>
+ * @deprecated Use {@link RocksDBOptionsFactory} instead. This factory has no mechanism to register
+ *             native handles to be closed and is thus deprecated in favor or a new variant.
  */
+@Deprecated
 public interface OptionsFactory extends java.io.Serializable {
 
 	/**
@@ -54,21 +37,9 @@
 	 * the setter methods, otherwise the pre-defined options may get lost.
 	 *
 	 * @param currentOptions The options object with the pre-defined options.
-	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
 	 * @return The options object on which the additional options are set.
 	 */
-	DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose);
-
-	/**
-	 * Set the additional options on top of the current options object.
-	 *
-	 * @param currentOptions The options object with the pre-defined options.
-	 * @return The options object on which the additional options are set.
-	 * @deprecated use {@link #createDBOptions(DBOptions, Collection)} instead.
-	 */
-	default DBOptions createDBOptions(DBOptions currentOptions) {
-		return createDBOptions(currentOptions, null);
-	}
+	DBOptions createDBOptions(DBOptions currentOptions);
 
 	/**
 	 * This method should set the additional options on top of the current options object.
@@ -79,21 +50,9 @@
 	 * the setter methods, otherwise the pre-defined options may get lost.
 	 *
 	 * @param currentOptions The options object with the pre-defined options.
-	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
 	 * @return The options object on which the additional options are set.
 	 */
-	ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose);
-
-	/**
-	 * Set the additional options on top of the current options object.
-	 *
-	 * @param currentOptions The options object with the pre-defined options.
-	 * @return The options object on which the additional options are set.
-	 * @deprecated use {@link #createColumnOptions(ColumnFamilyOptions, Collection)} instead.
-	 */
-	default ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
-		return createColumnOptions(currentOptions, null);
-	}
+	ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions);
 
 	/**
 	 * This method should enable certain RocksDB metrics to be forwarded to
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
index 7e08f25..1f27c3f 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBConfigurableOptions.java
@@ -35,10 +35,10 @@
 /**
  * This class contains the configuration options for the {@link DefaultConfigurableOptionsFactory}.
  *
- * <p>If nothing specified, RocksDB's options would be configured by {@link PredefinedOptions} and user-defined {@link OptionsFactory}.
+ * <p>If nothing specified, RocksDB's options would be configured by {@link PredefinedOptions} and user-defined {@link RocksDBOptionsFactory}.
  *
  * <p>If some options has been specifically configured, a corresponding {@link DefaultConfigurableOptionsFactory} would be created
- * and applied on top of {@link PredefinedOptions} except if a user-defined {@link OptionsFactory} overrides it.
+ * and applied on top of {@link PredefinedOptions} except if a user-defined {@link RocksDBOptionsFactory} overrides it.
  */
 public class RocksDBConfigurableOptions implements Serializable {
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
new file mode 100644
index 0000000..a5fd1e9
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactory.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * A factory for {@link DBOptions} and {@link ColumnFamilyOptions} to be passed to the {@link RocksDBStateBackend}.
+ * Options have to be created lazily by this factory, because the {@code Options}
+ * class is not serializable and holds pointers to native code.
+ *
+ * <p>A typical pattern to use this OptionsFactory is as follows:
+ *
+ * <pre>{@code
+ * rocksDbBackend.setOptions(new RocksDBOptionsFactory() {
+ *
+ *		public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+ *			return currentOptions.setMaxOpenFiles(1024);
+ *		}
+ *
+ *		public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+ *			BloomFilter bloomFilter = new BloomFilter();
+ * 			handlesToClose.add(bloomFilter);
+ *
+ * 			return currentOptions
+ * 					.setTableFormatConfig(
+ * 							new BlockBasedTableConfig().setFilter(bloomFilter));
+ *		}
+ * });
+ * }</pre>
+ */
+@SuppressWarnings("deprecation")
+public interface RocksDBOptionsFactory extends OptionsFactory, java.io.Serializable {
+
+	/**
+	 * This method should set the additional options on top of the current options object.
+	 * The current options object may contain pre-defined options based on flags that have
+	 * been configured on the state backend.
+	 *
+	 * <p>It is important to set the options on the current object and return the result from
+	 * the setter methods, otherwise the pre-defined options may get lost.
+	 *
+	 * @param currentOptions The options object with the pre-defined options.
+	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
+	 * @return The options object on which the additional options are set.
+	 */
+	DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose);
+
+	/**
+	 * This method should set the additional options on top of the current options object.
+	 * The current options object may contain pre-defined options based on flags that have
+	 * been configured on the state backend.
+	 *
+	 * <p>It is important to set the options on the current object and return the result from
+	 * the setter methods, otherwise the pre-defined options may get lost.
+	 *
+	 * @param currentOptions The options object with the pre-defined options.
+	 * @param handlesToClose The collection to register newly created {@link org.rocksdb.RocksObject}s.
+	 * @return The options object on which the additional options are set.
+	 */
+	ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose);
+
+	/**
+	 * This method should enable certain RocksDB metrics to be forwarded to
+	 * Flink's metrics reporter.
+	 *
+	 * <p>Enabling these monitoring options may degrade RockDB performance
+	 * and should be set with care.
+	 * @param nativeMetricOptions The options object with the pre-defined options.
+	 * @return The options object on which the additional options are set.
+	 */
+	default RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
+		return nativeMetricOptions;
+	}
+
+	// ------------------------------------------------------------------------
+	//  for compatibility
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Do not override these methods, they are only to maintain interface compatibility with
+	 * prior versions. They will be removed in one of the next versions.
+	 */
+	@Override
+	default DBOptions createDBOptions(DBOptions currentOptions) {
+		return createDBOptions(currentOptions, new ArrayList<>());
+	}
+
+	/**
+	 * Do not override these methods, they are only to maintain interface compatibility with
+	 * prior versions. They will be removed in one of the next versions.
+	 */
+	@Override
+	default ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
+		return createColumnOptions(currentOptions, new ArrayList<>());
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
new file mode 100644
index 0000000..36e8c3f
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryAdapter.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+
+/**
+ * A conversion from {@link OptionsFactory} to {@link RocksDBOptionsFactory}.
+ */
+@SuppressWarnings("deprecation")
+final class RocksDBOptionsFactoryAdapter implements ConfigurableRocksDBOptionsFactory {
+
+	private static final long serialVersionUID = 1L;
+
+	private final OptionsFactory optionsFactory;
+
+	RocksDBOptionsFactoryAdapter(OptionsFactory optionsFactory) {
+		this.optionsFactory = optionsFactory;
+	}
+
+	@Override
+	public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+		return optionsFactory.createDBOptions(currentOptions);
+	}
+
+	@Override
+	public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
+		return optionsFactory.createColumnOptions(currentOptions);
+	}
+
+	@Override
+	public RocksDBNativeMetricOptions createNativeMetricsOptions(RocksDBNativeMetricOptions nativeMetricOptions) {
+		return optionsFactory.createNativeMetricsOptions(nativeMetricOptions);
+	}
+
+	@Override
+	public RocksDBOptionsFactory configure(Configuration configuration) {
+		if (optionsFactory instanceof ConfigurableOptionsFactory) {
+			final OptionsFactory reconfigured = ((ConfigurableOptionsFactory) optionsFactory).configure(configuration);
+			return reconfigured == optionsFactory ? this : new RocksDBOptionsFactoryAdapter(reconfigured);
+		}
+
+		return this;
+	}
+
+	@Nullable
+	public static OptionsFactory unwrapIfAdapter(RocksDBOptionsFactory factory) {
+		return factory instanceof RocksDBOptionsFactoryAdapter
+				? ((RocksDBOptionsFactoryAdapter) factory).optionsFactory
+				: factory;
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
index fe2574a..0b81f6c 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java
@@ -44,7 +44,7 @@
 
 	/** The options factory to create the RocksDB options. */
 	@Nullable
-	private final OptionsFactory optionsFactory;
+	private final RocksDBOptionsFactory optionsFactory;
 
 	/** The shared resource among RocksDB instances. This resource is not part of the 'handlesToClose',
 	 * because the handles to close are closed quietly, whereas for this one, we want exceptions to be reported. */
@@ -58,13 +58,13 @@
 		this(PredefinedOptions.DEFAULT, null, null);
 	}
 
-	public RocksDBResourceContainer(PredefinedOptions predefinedOptions, @Nullable OptionsFactory optionsFactory) {
+	public RocksDBResourceContainer(PredefinedOptions predefinedOptions, @Nullable RocksDBOptionsFactory optionsFactory) {
 		this(predefinedOptions, optionsFactory, null);
 	}
 
 	public RocksDBResourceContainer(
 		PredefinedOptions predefinedOptions,
-		@Nullable OptionsFactory optionsFactory,
+		@Nullable RocksDBOptionsFactory optionsFactory,
 		@Nullable OpaqueMemoryResource<RocksDBSharedResources> sharedResources) {
 
 		this.predefinedOptions = checkNotNull(predefinedOptions);
@@ -119,7 +119,7 @@
 	}
 
 	@Nullable
-	OptionsFactory getOptionsFactory() {
+	RocksDBOptionsFactory getOptionsFactory() {
 		return optionsFactory;
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index b572deb..1984062 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -93,7 +93,7 @@
  *
  * <p>The behavior of the RocksDB instances can be parametrized by setting RocksDB Options
  * using the methods {@link #setPredefinedOptions(PredefinedOptions)} and
- * {@link #setOptions(OptionsFactory)}.
+ * {@link #setRocksDBOptions(RocksDBOptionsFactory)}.
  */
 public class RocksDBStateBackend extends AbstractStateBackend implements ConfigurableStateBackend {
 
@@ -136,7 +136,7 @@
 
 	/** The options factory to create the RocksDB options in the cluster. */
 	@Nullable
-	private OptionsFactory optionsFactory;
+	private RocksDBOptionsFactory rocksDbOptionsFactory;
 
 	/** This determines if incremental checkpointing is enabled. */
 	private final TernaryBoolean enableIncrementalCheckpointing;
@@ -359,8 +359,8 @@
 
 		// configure RocksDB options factory
 		try {
-			optionsFactory = configureOptionsFactory(
-				original.optionsFactory,
+			rocksDbOptionsFactory = configureOptionsFactory(
+				original.rocksDbOptionsFactory,
 				config.getString(RocksDBOptions.OPTIONS_FACTORY),
 				config,
 				classLoader);
@@ -582,15 +582,15 @@
 			cancelStreamRegistry).build();
 	}
 
-	private OptionsFactory configureOptionsFactory(
-			@Nullable OptionsFactory originalOptionsFactory,
+	private RocksDBOptionsFactory configureOptionsFactory(
+			@Nullable RocksDBOptionsFactory originalOptionsFactory,
 			String factoryClassName,
 			Configuration config,
 			ClassLoader classLoader) throws DynamicCodeLoadingException {
 
 		if (originalOptionsFactory != null) {
-			if (originalOptionsFactory instanceof ConfigurableOptionsFactory) {
-				originalOptionsFactory = ((ConfigurableOptionsFactory) originalOptionsFactory).configure(config);
+			if (originalOptionsFactory instanceof ConfigurableRocksDBOptionsFactory) {
+				originalOptionsFactory = ((ConfigurableRocksDBOptionsFactory) originalOptionsFactory).configure(config);
 			}
 			LOG.info("Using application-defined options factory: {}.", originalOptionsFactory);
 
@@ -607,13 +607,13 @@
 		} else {
 			try {
 				@SuppressWarnings("rawtypes")
-				Class<? extends OptionsFactory> clazz =
+				Class<? extends RocksDBOptionsFactory> clazz =
 					Class.forName(factoryClassName, false, classLoader)
-						.asSubclass(OptionsFactory.class);
+						.asSubclass(RocksDBOptionsFactory.class);
 
-				OptionsFactory optionsFactory = clazz.newInstance();
-				if (optionsFactory instanceof ConfigurableOptionsFactory) {
-					optionsFactory = ((ConfigurableOptionsFactory) optionsFactory).configure(config);
+				RocksDBOptionsFactory optionsFactory = clazz.newInstance();
+				if (optionsFactory instanceof ConfigurableRocksDBOptionsFactory) {
+					optionsFactory = ((ConfigurableRocksDBOptionsFactory) optionsFactory).configure(config);
 				}
 				LOG.info("Using configured options factory: {}.", optionsFactory);
 
@@ -776,7 +776,7 @@
 	 * Sets the predefined options for RocksDB.
 	 *
 	 * <p>If user-configured options within {@link RocksDBConfigurableOptions} is set (through flink-conf.yaml)
-	 * or a user-defined options factory is set (via {@link #setOptions(OptionsFactory)}),
+	 * or a user-defined options factory is set (via {@link #setRocksDBOptions(RocksDBOptionsFactory)}),
 	 * then the options from the factory are applied on top of the here specified
 	 * predefined options and customized options.
 	 *
@@ -792,7 +792,7 @@
 	 * are {@link PredefinedOptions#DEFAULT}.
 	 *
 	 * <p>If user-configured options within {@link RocksDBConfigurableOptions} is set (through flink-conf.yaml)
-	 * of a user-defined options factory is set (via {@link #setOptions(OptionsFactory)}),
+	 * of a user-defined options factory is set (via {@link #setRocksDBOptions(RocksDBOptionsFactory)}),
 	 * then the options from the factory are applied on top of the predefined and customized options.
 	 *
 	 * @return The currently set predefined options for RocksDB.
@@ -818,18 +818,45 @@
 	 *
 	 * @param optionsFactory The options factory that lazily creates the RocksDB options.
 	 */
-	public void setOptions(OptionsFactory optionsFactory) {
-		this.optionsFactory = optionsFactory;
+	public void setRocksDBOptions(RocksDBOptionsFactory optionsFactory) {
+		this.rocksDbOptionsFactory = optionsFactory;
 	}
 
 	/**
-	 * Gets the options factory that lazily creates the RocksDB options.
+	 * Gets {@link org.rocksdb.Options} for the RocksDB instances.
 	 *
-	 * @return The options factory.
+	 * <p>The options created by the factory here are applied on top of the pre-defined
+	 * options profile selected via {@link #setPredefinedOptions(PredefinedOptions)}.
+	 * If the pre-defined options profile is the default
+	 * ({@link PredefinedOptions#DEFAULT}), then the factory fully controls the RocksDB options.
 	 */
 	@Nullable
+	public RocksDBOptionsFactory getRocksDBOptions() {
+		return rocksDbOptionsFactory;
+	}
+
+	/**
+	 * The options factory supplied here was prone to resource leaks, because it did not have a way
+	 * to register native handles / objects that need to be disposed when the state backend is closed.
+	 *
+	 * @deprecated Use {@link #setRocksDBOptions(RocksDBOptionsFactory)} instead.
+	 */
+	@Deprecated
+	public void setOptions(OptionsFactory optionsFactory) {
+		this.rocksDbOptionsFactory = optionsFactory instanceof RocksDBOptionsFactory
+				? (RocksDBOptionsFactory) optionsFactory
+				: new RocksDBOptionsFactoryAdapter(optionsFactory);
+	}
+
+	/**
+	 * The options factory supplied here was prone to resource leaks, because it did not have a way
+	 * to register native handles / objects that need to be disposed when the state backend is closed.
+	 *
+	 * @deprecated Use {@link #setRocksDBOptions(RocksDBOptionsFactory)} and {@link #getRocksDBOptions()} instead.
+	 */
+	@Deprecated
 	public OptionsFactory getOptions() {
-		return optionsFactory;
+		return RocksDBOptionsFactoryAdapter.unwrapIfAdapter(rocksDbOptionsFactory);
 	}
 
 	/**
@@ -882,7 +909,7 @@
 
 		return new RocksDBResourceContainer(
 			predefinedOptions != null ? predefinedOptions : PredefinedOptions.DEFAULT,
-			optionsFactory,
+			rocksDbOptionsFactory,
 			sharedResources);
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryCompatibilityTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryCompatibilityTest.java
new file mode 100644
index 0000000..f96bed3
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBOptionsFactoryCompatibilityTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.configuration.Configuration;
+
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests that the changes introducing the {@link RocksDBOptionsFactory} are backwards compatible.
+ */
+public class RocksDBOptionsFactoryCompatibilityTest {
+
+	@Test
+	public void testInheritance() {
+		assertThat(new DefaultConfigurableOptionsFactory(), instanceOf(RocksDBOptionsFactory.class));
+	}
+
+	@Test
+	public void testSetAndGet() throws Exception {
+		final RocksDBStateBackend backend = new RocksDBStateBackend("file:///a/b/c");
+		final OptionsFactory testFactory = new TestOptionsFactory();
+
+		backend.setOptions(testFactory);
+
+		assertSame(testFactory, backend.getOptions());
+	}
+
+	@Test
+	public void testConfiguration() throws Exception {
+		final RocksDBStateBackend backend = new RocksDBStateBackend("file:///a/b/c");
+		final OptionsFactory testFactory = new TestOptionsFactory();
+
+		backend.setOptions(testFactory);
+
+		final TestOptionsFactory reconfigured = (TestOptionsFactory) backend
+				.configure(new Configuration(), getClass().getClassLoader())
+				.getOptions();
+
+		assertTrue(reconfigured.wasConfigured);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class TestOptionsFactory implements ConfigurableOptionsFactory {
+
+		boolean wasConfigured;
+
+		@Override
+		public OptionsFactory configure(Configuration configuration) {
+			wasConfigured = true;
+			return this;
+		}
+
+		@Override
+		public DBOptions createDBOptions(DBOptions currentOptions) {
+			return currentOptions;
+		}
+
+		@Override
+		public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
+			return currentOptions;
+		}
+	}
+}
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
index c73a3d6..d25baa7 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResource.java
@@ -49,7 +49,7 @@
 	private static final Logger LOG = LoggerFactory.getLogger(RocksDBResource.class);
 
 	/** Factory for {@link DBOptions} and {@link ColumnFamilyOptions}. */
-	private final OptionsFactory optionsFactory;
+	private final RocksDBOptionsFactory optionsFactory;
 
 	/** Temporary folder that provides the working directory for the RocksDB instance. */
 	private TemporaryFolder temporaryFolder;
@@ -79,7 +79,7 @@
 	private ArrayList<AutoCloseable> handlesToClose = new ArrayList<>();
 
 	public RocksDBResource() {
-		this(new OptionsFactory() {
+		this(new RocksDBOptionsFactory() {
 			@Override
 			public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
 				//close it before reuse the reference.
@@ -106,7 +106,7 @@
 		});
 	}
 
-	public RocksDBResource(@Nonnull OptionsFactory optionsFactory) {
+	public RocksDBResource(@Nonnull RocksDBOptionsFactory optionsFactory) {
 		this.optionsFactory = optionsFactory;
 	}
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index a8030a1..197c63b 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -521,6 +521,7 @@
 
 		rocksDbBackend = rocksDbBackend.configure(config, getClass().getClassLoader());
 
+		assertTrue(rocksDbBackend.getRocksDBOptions() instanceof TestOptionsFactory);
 		assertTrue(rocksDbBackend.getOptions() instanceof TestOptionsFactory);
 
 		try (RocksDBResourceContainer optionsContainer = rocksDbBackend.createOptionsAndResourceContainer()) {
@@ -529,7 +530,7 @@
 		}
 
 		// verify that user-defined options factory could be set programmatically and override pre-configured one.
-		rocksDbBackend.setOptions(new OptionsFactory() {
+		rocksDbBackend.setRocksDBOptions(new RocksDBOptionsFactory() {
 			@Override
 			public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
 				return currentOptions;
@@ -549,7 +550,7 @@
 
 	@Test
 	public void testPredefinedAndOptionsFactory() throws Exception {
-		final OptionsFactory optionsFactory = new OptionsFactory() {
+		final RocksDBOptionsFactory optionsFactory = new RocksDBOptionsFactory() {
 			@Override
 			public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
 				return currentOptions;
@@ -598,8 +599,8 @@
 		assertNotEquals(predOptions, original.getPredefinedOptions());
 		original.setPredefinedOptions(predOptions);
 
-		final OptionsFactory optionsFactory = mock(OptionsFactory.class);
-		original.setOptions(optionsFactory);
+		final RocksDBOptionsFactory optionsFactory = mock(RocksDBOptionsFactory.class);
+		original.setRocksDBOptions(optionsFactory);
 
 		final String[] localDirs = new String[] {
 				tempFolder.newFolder().getAbsolutePath(), tempFolder.newFolder().getAbsolutePath() };
@@ -744,7 +745,7 @@
 	/**
 	 * An implementation of options factory for testing.
 	 */
-	public static class TestOptionsFactory implements ConfigurableOptionsFactory {
+	public static class TestOptionsFactory implements ConfigurableRocksDBOptionsFactory {
 		public static final String BACKGROUND_JOBS_OPTION = "my.custom.rocksdb.backgroundJobs";
 
 		private static final int DEFAULT_BACKGROUND_JOBS = 2;
@@ -761,7 +762,7 @@
 		}
 
 		@Override
-		public OptionsFactory configure(Configuration configuration) {
+		public RocksDBOptionsFactory configure(Configuration configuration) {
 			this.backgroundJobs = configuration.getInteger(BACKGROUND_JOBS_OPTION, DEFAULT_BACKGROUND_JOBS);
 			return this;
 		}