blob: a61079b32927d47d34da28ddbf45935926cc4b63 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.IOUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionStyle;
import org.rocksdb.DBOptions;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.util.SizeUnit;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import static org.apache.flink.contrib.streaming.state.RocksDBTestUtils.createKeyedStateBackend;
import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/** Tests for configuring the RocksDB State Backend. */
@SuppressWarnings("serial")
public class RocksDBStateBackendConfigTest {
@Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
// ------------------------------------------------------------------------
// default values
// ------------------------------------------------------------------------
@Test
public void testDefaultsInSync() throws Exception {
final boolean defaultIncremental =
CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
assertEquals(defaultIncremental, backend.isIncrementalCheckpointsEnabled());
}
// ------------------------------------------------------------------------
// RocksDB local file directory
// ------------------------------------------------------------------------
/** This test checks the behavior for basic setting of local DB directories. */
@Test
public void testSetDbPath() throws Exception {
final EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend();
final String testDir1 = tempFolder.newFolder().getAbsolutePath();
final String testDir2 = tempFolder.newFolder().getAbsolutePath();
assertNull(rocksDbBackend.getDbStoragePaths());
rocksDbBackend.setDbStoragePath(testDir1);
assertArrayEquals(new String[] {testDir1}, rocksDbBackend.getDbStoragePaths());
rocksDbBackend.setDbStoragePath(null);
assertNull(rocksDbBackend.getDbStoragePaths());
rocksDbBackend.setDbStoragePaths(testDir1, testDir2);
assertArrayEquals(new String[] {testDir1, testDir2}, rocksDbBackend.getDbStoragePaths());
final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
final RocksDBKeyedStateBackend<Integer> keyedBackend =
createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
try {
File instanceBasePath = keyedBackend.getInstanceBasePath();
assertThat(
instanceBasePath.getAbsolutePath(),
anyOf(startsWith(testDir1), startsWith(testDir2)));
//noinspection NullArgumentToVariableArgMethod
rocksDbBackend.setDbStoragePaths(null);
assertNull(rocksDbBackend.getDbStoragePaths());
} finally {
IOUtils.closeQuietly(keyedBackend);
keyedBackend.dispose();
env.close();
}
}
@Test
public void testConfigureTimerService() throws Exception {
final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
// Fix the option key string
Assert.assertEquals(
"state.backend.rocksdb.timer-service.factory",
RocksDBOptions.TIMER_SERVICE_FACTORY.key());
// Fix the option value string and ensure all are covered
Assert.assertEquals(2, EmbeddedRocksDBStateBackend.PriorityQueueStateType.values().length);
Assert.assertEquals(
"ROCKSDB", EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
Assert.assertEquals(
"HEAP", EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP.toString());
// Fix the default
Assert.assertEquals(
EmbeddedRocksDBStateBackend.PriorityQueueStateType.ROCKSDB,
RocksDBOptions.TIMER_SERVICE_FACTORY.defaultValue());
EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend();
RocksDBKeyedStateBackend<Integer> keyedBackend =
createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
Assert.assertEquals(
RocksDBPriorityQueueSetFactory.class,
keyedBackend.getPriorityQueueFactory().getClass());
keyedBackend.dispose();
Configuration conf = new Configuration();
conf.set(
RocksDBOptions.TIMER_SERVICE_FACTORY,
EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP);
rocksDbBackend =
rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader());
keyedBackend = createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
Assert.assertEquals(
HeapPriorityQueueSetFactory.class,
keyedBackend.getPriorityQueueFactory().getClass());
keyedBackend.dispose();
env.close();
}
/** Validates that user custom configuration from code should override the flink-conf.yaml. */
@Test
public void testConfigureTimerServiceLoadingFromApplication() throws Exception {
final MockEnvironment env = new MockEnvironmentBuilder().build();
// priorityQueueStateType of the job backend
final EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend();
backend.setPriorityQueueStateType(EmbeddedRocksDBStateBackend.PriorityQueueStateType.HEAP);
// priorityQueueStateType in the cluster config
final Configuration configFromConfFile = new Configuration();
configFromConfFile.setString(
RocksDBOptions.TIMER_SERVICE_FACTORY.key(),
RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
// configure final backend from job and cluster config
final EmbeddedRocksDBStateBackend configuredRocksDBStateBackend =
backend.configure(
configFromConfFile, Thread.currentThread().getContextClassLoader());
final RocksDBKeyedStateBackend<Integer> keyedBackend =
createKeyedStateBackend(configuredRocksDBStateBackend, env, IntSerializer.INSTANCE);
// priorityQueueStateType of the job backend should be preserved
assertThat(
keyedBackend.getPriorityQueueFactory(),
instanceOf(HeapPriorityQueueSetFactory.class));
keyedBackend.close();
keyedBackend.dispose();
env.close();
}
@Test
public void testStoragePathWithFilePrefix() throws Exception {
final File folder = tempFolder.newFolder();
final String dbStoragePath = new Path(folder.toURI().toString()).toString();
assertTrue(dbStoragePath.startsWith("file:"));
testLocalDbPaths(dbStoragePath, folder);
}
@Test
public void testWithDefaultFsSchemeNoStoragePath() throws Exception {
try {
// set the default file system scheme
Configuration config = new Configuration();
config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink");
FileSystem.initialize(config);
testLocalDbPaths(null, tempFolder.getRoot());
} finally {
FileSystem.initialize(new Configuration());
}
}
@Test
public void testWithDefaultFsSchemeAbsoluteStoragePath() throws Exception {
final File folder = tempFolder.newFolder();
final String dbStoragePath = folder.getAbsolutePath();
try {
// set the default file system scheme
Configuration config = new Configuration();
config.setString(CoreOptions.DEFAULT_FILESYSTEM_SCHEME, "s3://mydomain.com:8020/flink");
FileSystem.initialize(config);
testLocalDbPaths(dbStoragePath, folder);
} finally {
FileSystem.initialize(new Configuration());
}
}
private void testLocalDbPaths(String configuredPath, File expectedPath) throws Exception {
final EmbeddedRocksDBStateBackend rocksDbBackend = new EmbeddedRocksDBStateBackend();
rocksDbBackend.setDbStoragePath(configuredPath);
final MockEnvironment env = getMockEnvironment(tempFolder.newFolder());
RocksDBKeyedStateBackend<Integer> keyedBackend =
createKeyedStateBackend(rocksDbBackend, env, IntSerializer.INSTANCE);
try {
File instanceBasePath = keyedBackend.getInstanceBasePath();
assertThat(
instanceBasePath.getAbsolutePath(), startsWith(expectedPath.getAbsolutePath()));
//noinspection NullArgumentToVariableArgMethod
rocksDbBackend.setDbStoragePaths(null);
assertNull(rocksDbBackend.getDbStoragePaths());
} finally {
IOUtils.closeQuietly(keyedBackend);
keyedBackend.dispose();
env.close();
}
}
/** Validates that empty arguments for the local DB path are invalid. */
@Test(expected = IllegalArgumentException.class)
public void testSetEmptyPaths() throws Exception {
String checkpointPath = tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
rocksDbBackend.setDbStoragePaths();
}
/** Validates that schemes other than 'file:/' are not allowed. */
@Test(expected = IllegalArgumentException.class)
public void testNonFileSchemePath() throws Exception {
String checkpointPath = tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
}
@Test(expected = IllegalArgumentException.class)
public void testDbPathRelativePaths() throws Exception {
RocksDBStateBackend rocksDbBackend =
new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());
rocksDbBackend.setDbStoragePath("relative/path");
}
// ------------------------------------------------------------------------
// RocksDB local file automatic from temp directories
// ------------------------------------------------------------------------
/**
* This tests whether the RocksDB backends uses the temp directories that are provided from the
* {@link Environment} when no db storage path is set.
*/
@Test
public void testUseTempDirectories() throws Exception {
String checkpointPath = tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
File dir1 = tempFolder.newFolder();
File dir2 = tempFolder.newFolder();
assertNull(rocksDbBackend.getDbStoragePaths());
final MockEnvironment env = getMockEnvironment(dir1, dir2);
RocksDBKeyedStateBackend<Integer> keyedBackend =
(RocksDBKeyedStateBackend<Integer>)
rocksDbBackend.createKeyedStateBackend(
env,
env.getJobID(),
"test_op",
IntSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
env.getTaskKvStateRegistry(),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());
try {
File instanceBasePath = keyedBackend.getInstanceBasePath();
assertThat(
instanceBasePath.getAbsolutePath(),
anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
} finally {
IOUtils.closeQuietly(keyedBackend);
keyedBackend.dispose();
env.close();
}
}
// ------------------------------------------------------------------------
// RocksDB local file directory initialization
// ------------------------------------------------------------------------
@Test
public void testFailWhenNoLocalStorageDir() throws Exception {
final File targetDir = tempFolder.newFolder();
Assume.assumeTrue(
"Cannot mark directory non-writable", targetDir.setWritable(false, false));
String checkpointPath = tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
try (MockEnvironment env = getMockEnvironment(tempFolder.newFolder())) {
rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
boolean hasFailure = false;
try {
rocksDbBackend.createKeyedStateBackend(
env,
env.getJobID(),
"foobar",
IntSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());
} catch (Exception e) {
assertTrue(e.getMessage().contains("No local storage directories available"));
assertTrue(e.getMessage().contains(targetDir.getAbsolutePath()));
hasFailure = true;
}
assertTrue(
"We must see a failure because no storaged directory is feasible.", hasFailure);
} finally {
//noinspection ResultOfMethodCallIgnored
targetDir.setWritable(true, false);
}
}
@Test
public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
final File targetDir1 = tempFolder.newFolder();
final File targetDir2 = tempFolder.newFolder();
Assume.assumeTrue(
"Cannot mark directory non-writable", targetDir1.setWritable(false, false));
String checkpointPath = tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
try (MockEnvironment env = getMockEnvironment(tempFolder.newFolder())) {
rocksDbBackend.setDbStoragePaths(
targetDir1.getAbsolutePath(), targetDir2.getAbsolutePath());
try {
AbstractKeyedStateBackend<Integer> keyedStateBackend =
rocksDbBackend.createKeyedStateBackend(
env,
env.getJobID(),
"foobar",
IntSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
new KvStateRegistry()
.createTaskRegistry(env.getJobID(), new JobVertexID()),
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
new CloseableRegistry());
IOUtils.closeQuietly(keyedStateBackend);
keyedStateBackend.dispose();
} catch (Exception e) {
e.printStackTrace();
fail("Backend initialization failed even though some paths were available");
}
} finally {
//noinspection ResultOfMethodCallIgnored
targetDir1.setWritable(true, false);
}
}
// ------------------------------------------------------------------------
// RocksDB Options
// ------------------------------------------------------------------------
@Test
public void testPredefinedOptions() throws Exception {
String checkpointPath = tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
// verify that we would use PredefinedOptions.DEFAULT by default.
assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
// verify that user could configure predefined options via flink-conf.yaml
Configuration configuration = new Configuration();
configuration.setString(
RocksDBOptions.PREDEFINED_OPTIONS, PredefinedOptions.FLASH_SSD_OPTIMIZED.name());
rocksDbBackend = new RocksDBStateBackend(checkpointPath);
rocksDbBackend = rocksDbBackend.configure(configuration, getClass().getClassLoader());
assertEquals(PredefinedOptions.FLASH_SSD_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
// verify that predefined options could be set programmatically and override pre-configured
// one.
rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
assertEquals(
PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
}
@Test
public void testSetConfigurableOptions() throws Exception {
DefaultConfigurableOptionsFactory customizedOptions =
new DefaultConfigurableOptionsFactory()
.setMaxBackgroundThreads(4)
.setMaxOpenFiles(-1)
.setLogLevel(InfoLogLevel.DEBUG_LEVEL)
.setCompactionStyle(CompactionStyle.LEVEL)
.setUseDynamicLevelSize(true)
.setTargetFileSizeBase("4MB")
.setMaxSizeLevelBase("128 mb")
.setWriteBufferSize("128 MB")
.setMaxWriteBufferNumber(4)
.setMinWriteBufferNumberToMerge(3)
.setBlockSize("64KB")
.setMetadataBlockSize("16KB")
.setBlockCacheSize("512mb");
try (RocksDBResourceContainer optionsContainer =
new RocksDBResourceContainer(PredefinedOptions.DEFAULT, customizedOptions)) {
DBOptions dbOptions = optionsContainer.getDbOptions();
assertEquals(-1, dbOptions.maxOpenFiles());
assertEquals(InfoLogLevel.DEBUG_LEVEL, dbOptions.infoLogLevel());
ColumnFamilyOptions columnOptions = optionsContainer.getColumnOptions();
assertEquals(CompactionStyle.LEVEL, columnOptions.compactionStyle());
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
assertEquals(4 * SizeUnit.MB, columnOptions.targetFileSizeBase());
assertEquals(128 * SizeUnit.MB, columnOptions.maxBytesForLevelBase());
assertEquals(4, columnOptions.maxWriteBufferNumber());
assertEquals(3, columnOptions.minWriteBufferNumberToMerge());
BlockBasedTableConfig tableConfig =
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
assertEquals(64 * SizeUnit.KB, tableConfig.blockSize());
assertEquals(16 * SizeUnit.KB, tableConfig.metadataBlockSize());
assertEquals(512 * SizeUnit.MB, tableConfig.blockCacheSize());
}
}
@Test
public void testConfigurableOptionsFromConfig() throws Exception {
Configuration configuration = new Configuration();
DefaultConfigurableOptionsFactory defaultOptionsFactory =
new DefaultConfigurableOptionsFactory();
assertTrue(defaultOptionsFactory.configure(configuration).getConfiguredOptions().isEmpty());
// verify illegal configuration
{
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS, "-1");
verifyIllegalArgument(RocksDBConfigurableOptions.LOG_LEVEL, "DEBUG");
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER, "-1");
verifyIllegalArgument(
RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE, "-1");
verifyIllegalArgument(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE, "0KB");
verifyIllegalArgument(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE, "1BB");
verifyIllegalArgument(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE, "-1KB");
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_SIZE, "0MB");
verifyIllegalArgument(RocksDBConfigurableOptions.METADATA_BLOCK_SIZE, "0MB");
verifyIllegalArgument(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE, "0");
verifyIllegalArgument(RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE, "1");
verifyIllegalArgument(RocksDBConfigurableOptions.COMPACTION_STYLE, "LEV");
}
// verify legal configuration
{
configuration.setString(RocksDBConfigurableOptions.LOG_LEVEL.key(), "DEBUG_LEVEL");
configuration.setString(RocksDBConfigurableOptions.COMPACTION_STYLE.key(), "level");
configuration.setString(
RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE.key(), "TRUE");
configuration.setString(RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE.key(), "8 mb");
configuration.setString(RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE.key(), "128MB");
configuration.setString(RocksDBConfigurableOptions.MAX_BACKGROUND_THREADS.key(), "4");
configuration.setString(RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER.key(), "4");
configuration.setString(
RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE.key(), "2");
configuration.setString(RocksDBConfigurableOptions.WRITE_BUFFER_SIZE.key(), "64 MB");
configuration.setString(RocksDBConfigurableOptions.BLOCK_SIZE.key(), "4 kb");
configuration.setString(RocksDBConfigurableOptions.METADATA_BLOCK_SIZE.key(), "8 kb");
configuration.setString(RocksDBConfigurableOptions.BLOCK_CACHE_SIZE.key(), "512 mb");
DefaultConfigurableOptionsFactory optionsFactory =
new DefaultConfigurableOptionsFactory();
optionsFactory.configure(configuration);
try (RocksDBResourceContainer optionsContainer =
new RocksDBResourceContainer(PredefinedOptions.DEFAULT, optionsFactory)) {
DBOptions dbOptions = optionsContainer.getDbOptions();
assertEquals(-1, dbOptions.maxOpenFiles());
assertEquals(InfoLogLevel.DEBUG_LEVEL, dbOptions.infoLogLevel());
ColumnFamilyOptions columnOptions = optionsContainer.getColumnOptions();
assertEquals(CompactionStyle.LEVEL, columnOptions.compactionStyle());
assertTrue(columnOptions.levelCompactionDynamicLevelBytes());
assertEquals(8 * SizeUnit.MB, columnOptions.targetFileSizeBase());
assertEquals(128 * SizeUnit.MB, columnOptions.maxBytesForLevelBase());
assertEquals(4, columnOptions.maxWriteBufferNumber());
assertEquals(2, columnOptions.minWriteBufferNumberToMerge());
assertEquals(64 * SizeUnit.MB, columnOptions.writeBufferSize());
BlockBasedTableConfig tableConfig =
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
assertEquals(4 * SizeUnit.KB, tableConfig.blockSize());
assertEquals(8 * SizeUnit.KB, tableConfig.metadataBlockSize());
assertEquals(512 * SizeUnit.MB, tableConfig.blockCacheSize());
}
}
}
@Test
public void testOptionsFactory() throws Exception {
String checkpointPath = tempFolder.newFolder().toURI().toString();
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
// verify that user-defined options factory could be configured via flink-conf.yaml
Configuration config = new Configuration();
config.setString(RocksDBOptions.OPTIONS_FACTORY.key(), TestOptionsFactory.class.getName());
config.setString(TestOptionsFactory.BACKGROUND_JOBS_OPTION.key(), "4");
rocksDbBackend = rocksDbBackend.configure(config, getClass().getClassLoader());
assertTrue(rocksDbBackend.getRocksDBOptions() instanceof TestOptionsFactory);
try (RocksDBResourceContainer optionsContainer =
rocksDbBackend.createOptionsAndResourceContainer()) {
DBOptions dbOptions = optionsContainer.getDbOptions();
assertEquals(4, dbOptions.maxBackgroundJobs());
}
// verify that user-defined options factory could be set programmatically and override
// pre-configured one.
rocksDbBackend.setRocksDBOptions(
new RocksDBOptionsFactory() {
@Override
public DBOptions createDBOptions(
DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions;
}
@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
return currentOptions.setCompactionStyle(CompactionStyle.FIFO);
}
});
try (RocksDBResourceContainer optionsContainer =
rocksDbBackend.createOptionsAndResourceContainer()) {
ColumnFamilyOptions colCreated = optionsContainer.getColumnOptions();
assertEquals(CompactionStyle.FIFO, colCreated.compactionStyle());
}
}
@Test
public void testPredefinedAndOptionsFactory() throws Exception {
final RocksDBOptionsFactory optionsFactory =
new RocksDBOptionsFactory() {
@Override
public DBOptions createDBOptions(
DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions;
}
@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
return currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
}
};
try (final RocksDBResourceContainer optionsContainer =
new RocksDBResourceContainer(
PredefinedOptions.SPINNING_DISK_OPTIMIZED, optionsFactory)) {
final ColumnFamilyOptions columnFamilyOptions = optionsContainer.getColumnOptions();
assertNotNull(columnFamilyOptions);
assertEquals(CompactionStyle.UNIVERSAL, columnFamilyOptions.compactionStyle());
}
}
@Test
public void testPredefinedOptionsEnum() {
ArrayList<AutoCloseable> handlesToClose = new ArrayList<>();
for (PredefinedOptions o : PredefinedOptions.values()) {
try (DBOptions opt = o.createDBOptions(handlesToClose)) {
assertNotNull(opt);
}
}
handlesToClose.forEach(IOUtils::closeQuietly);
handlesToClose.clear();
}
// ------------------------------------------------------------------------
// Reconfiguration
// ------------------------------------------------------------------------
@Test
public void testRocksDbReconfigurationCopiesExistingValues() throws Exception {
final FsStateBackend checkpointBackend =
new FsStateBackend(tempFolder.newFolder().toURI().toString());
final boolean incremental = !CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
final RocksDBStateBackend original =
new RocksDBStateBackend(checkpointBackend, incremental);
// these must not be the default options
final PredefinedOptions predOptions = PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM;
assertNotEquals(predOptions, original.getPredefinedOptions());
original.setPredefinedOptions(predOptions);
final RocksDBOptionsFactory optionsFactory = mock(RocksDBOptionsFactory.class);
original.setRocksDBOptions(optionsFactory);
final String[] localDirs =
new String[] {
tempFolder.newFolder().getAbsolutePath(),
tempFolder.newFolder().getAbsolutePath()
};
original.setDbStoragePaths(localDirs);
RocksDBStateBackend copy =
original.configure(
new Configuration(), Thread.currentThread().getContextClassLoader());
assertEquals(
original.isIncrementalCheckpointsEnabled(), copy.isIncrementalCheckpointsEnabled());
assertArrayEquals(original.getDbStoragePaths(), copy.getDbStoragePaths());
assertEquals(original.getRocksDBOptions(), copy.getRocksDBOptions());
assertEquals(original.getPredefinedOptions(), copy.getPredefinedOptions());
FsStateBackend copyCheckpointBackend = (FsStateBackend) copy.getCheckpointBackend();
assertEquals(
checkpointBackend.getCheckpointPath(), copyCheckpointBackend.getCheckpointPath());
assertEquals(
checkpointBackend.getSavepointPath(), copyCheckpointBackend.getSavepointPath());
}
// ------------------------------------------------------------------------
// RocksDB Memory Control
// ------------------------------------------------------------------------
@Test
public void testDefaultMemoryControlParameters() {
RocksDBMemoryConfiguration memSettings = new RocksDBMemoryConfiguration();
assertTrue(memSettings.isUsingManagedMemory());
assertFalse(memSettings.isUsingFixedMemoryPerSlot());
assertEquals(
RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue(),
memSettings.getHighPriorityPoolRatio(),
0.0);
assertEquals(
RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue(),
memSettings.getWriteBufferRatio(),
0.0);
RocksDBMemoryConfiguration configured =
RocksDBMemoryConfiguration.fromOtherAndConfiguration(
memSettings, new Configuration());
assertTrue(configured.isUsingManagedMemory());
assertFalse(configured.isUsingFixedMemoryPerSlot());
assertEquals(
RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue(),
configured.getHighPriorityPoolRatio(),
0.0);
assertEquals(
RocksDBOptions.WRITE_BUFFER_RATIO.defaultValue(),
configured.getWriteBufferRatio(),
0.0);
}
@Test
public void testConfigureManagedMemory() {
final Configuration config = new Configuration();
config.setBoolean(RocksDBOptions.USE_MANAGED_MEMORY, true);
final RocksDBMemoryConfiguration memSettings =
RocksDBMemoryConfiguration.fromOtherAndConfiguration(
new RocksDBMemoryConfiguration(), config);
assertTrue(memSettings.isUsingManagedMemory());
}
@Test
public void testConfigureIllegalMemoryControlParameters() {
RocksDBMemoryConfiguration memSettings = new RocksDBMemoryConfiguration();
verifySetParameter(() -> memSettings.setFixedMemoryPerSlot("-1B"));
verifySetParameter(() -> memSettings.setHighPriorityPoolRatio(-0.1));
verifySetParameter(() -> memSettings.setHighPriorityPoolRatio(1.1));
verifySetParameter(() -> memSettings.setWriteBufferRatio(-0.1));
verifySetParameter(() -> memSettings.setWriteBufferRatio(1.1));
memSettings.setFixedMemoryPerSlot("128MB");
memSettings.setWriteBufferRatio(0.6);
memSettings.setHighPriorityPoolRatio(0.6);
try {
// sum of writeBufferRatio and highPriPoolRatio larger than 1.0
memSettings.validate();
fail("Expected an IllegalArgumentException.");
} catch (IllegalArgumentException expected) {
// expected exception
}
}
private void verifySetParameter(Runnable setter) {
try {
setter.run();
fail("No expected IllegalArgumentException.");
} catch (IllegalArgumentException expected) {
// expected exception
}
}
// ------------------------------------------------------------------------
// Contained Non-partitioned State Backend
// ------------------------------------------------------------------------
@Test
public void testCallsForwardedToNonPartitionedBackend() throws Exception {
StateBackend storageBackend = new MemoryStateBackend();
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend);
assertEquals(storageBackend, rocksDbBackend.getCheckpointBackend());
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
static MockEnvironment getMockEnvironment(File... tempDirs) {
final String[] tempDirStrings = new String[tempDirs.length];
for (int i = 0; i < tempDirs.length; i++) {
tempDirStrings[i] = tempDirs[i].getAbsolutePath();
}
IOManager ioMan = mock(IOManager.class);
when(ioMan.getSpillingDirectories()).thenReturn(tempDirs);
return MockEnvironment.builder()
.setUserCodeClassLoader(RocksDBStateBackendConfigTest.class.getClassLoader())
.setTaskManagerRuntimeInfo(
new TestingTaskManagerRuntimeInfo(new Configuration(), tempDirStrings))
.setIOManager(ioMan)
.build();
}
private void verifyIllegalArgument(ConfigOption<?> configOption, String configValue) {
Configuration configuration = new Configuration();
configuration.setString(configOption.key(), configValue);
DefaultConfigurableOptionsFactory optionsFactory = new DefaultConfigurableOptionsFactory();
try {
optionsFactory.configure(configuration);
fail("Not throwing expected IllegalArgumentException.");
} catch (IllegalArgumentException e) {
// ignored
}
}
/** An implementation of options factory for testing. */
public static class TestOptionsFactory implements ConfigurableRocksDBOptionsFactory {
public static final ConfigOption<Integer> BACKGROUND_JOBS_OPTION =
ConfigOptions.key("my.custom.rocksdb.backgroundJobs").intType().defaultValue(2);
private int backgroundJobs = BACKGROUND_JOBS_OPTION.defaultValue();
@Override
public DBOptions createDBOptions(
DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions.setMaxBackgroundJobs(backgroundJobs);
}
@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
}
@Override
public RocksDBOptionsFactory configure(ReadableConfig configuration) {
this.backgroundJobs = configuration.get(BACKGROUND_JOBS_OPTION);
return this;
}
}
}