blob: ad8eb8a02d2b8d178d1f1a7796b1bc2e091fecb3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.IndexType;
import org.rocksdb.LRUCache;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.ReadOptions;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteBufferManager;
import org.rocksdb.WriteOptions;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import static org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getFilterFromBlockBasedTableConfig;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
/** Tests to guard {@link RocksDBResourceContainer}. */
public class RocksDBResourceContainerTest {
@ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
@BeforeClass
public static void ensureRocksDbNativeLibraryLoaded() throws IOException {
NativeLibraryLoader.getInstance().loadLibrary(TMP_FOLDER.newFolder().getAbsolutePath());
}
// ------------------------------------------------------------------------
@Test
public void testFreeDBOptionsAfterClose() throws Exception {
RocksDBResourceContainer container = new RocksDBResourceContainer();
DBOptions dbOptions = container.getDbOptions();
assertThat(dbOptions.isOwningHandle(), is(true));
container.close();
assertThat(dbOptions.isOwningHandle(), is(false));
}
@Test
public void testFreeMultipleDBOptionsAfterClose() throws Exception {
RocksDBResourceContainer container = new RocksDBResourceContainer();
final int optionNumber = 20;
ArrayList<DBOptions> dbOptions = new ArrayList<>(optionNumber);
for (int i = 0; i < optionNumber; i++) {
dbOptions.add(container.getDbOptions());
}
container.close();
for (DBOptions dbOption : dbOptions) {
assertThat(dbOption.isOwningHandle(), is(false));
}
}
/**
* Guard the shared resources will be released after {@link RocksDBResourceContainer#close()}
* when the {@link RocksDBResourceContainer} instance is initiated with {@link
* OpaqueMemoryResource}.
*
* @throws Exception if unexpected error happened.
*/
@Test
public void testSharedResourcesAfterClose() throws Exception {
OpaqueMemoryResource<RocksDBSharedResources> sharedResources = getSharedResources();
RocksDBResourceContainer container =
new RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, sharedResources);
container.close();
RocksDBSharedResources rocksDBSharedResources = sharedResources.getResourceHandle();
assertThat(rocksDBSharedResources.getCache().isOwningHandle(), is(false));
assertThat(rocksDBSharedResources.getWriteBufferManager().isOwningHandle(), is(false));
}
/**
* Guard that {@link RocksDBResourceContainer#getDbOptions()} shares the same {@link
* WriteBufferManager} instance if the {@link RocksDBResourceContainer} instance is initiated
* with {@link OpaqueMemoryResource}.
*
* @throws Exception if unexpected error happened.
*/
@Test
public void testGetDbOptionsWithSharedResources() throws Exception {
final int optionNumber = 20;
OpaqueMemoryResource<RocksDBSharedResources> sharedResources = getSharedResources();
RocksDBResourceContainer container =
new RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, sharedResources);
HashSet<WriteBufferManager> writeBufferManagers = new HashSet<>();
for (int i = 0; i < optionNumber; i++) {
DBOptions dbOptions = container.getDbOptions();
WriteBufferManager writeBufferManager = getWriteBufferManager(dbOptions);
writeBufferManagers.add(writeBufferManager);
}
assertThat(writeBufferManagers.size(), is(1));
assertThat(
writeBufferManagers.iterator().next(),
is(sharedResources.getResourceHandle().getWriteBufferManager()));
container.close();
}
/**
* Guard that {@link RocksDBResourceContainer#getColumnOptions()} shares the same {@link Cache}
* instance if the {@link RocksDBResourceContainer} instance is initiated with {@link
* OpaqueMemoryResource}.
*
* @throws Exception if unexpected error happened.
*/
@Test
public void testGetColumnFamilyOptionsWithSharedResources() throws Exception {
final int optionNumber = 20;
OpaqueMemoryResource<RocksDBSharedResources> sharedResources = getSharedResources();
RocksDBResourceContainer container =
new RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, sharedResources);
HashSet<Cache> caches = new HashSet<>();
for (int i = 0; i < optionNumber; i++) {
ColumnFamilyOptions columnOptions = container.getColumnOptions();
Cache cache = getBlockCache(columnOptions);
caches.add(cache);
}
assertThat(caches.size(), is(1));
assertThat(caches.iterator().next(), is(sharedResources.getResourceHandle().getCache()));
container.close();
}
private OpaqueMemoryResource<RocksDBSharedResources> getSharedResources() {
final long cacheSize = 1024L, writeBufferSize = 512L;
final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1);
final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, cache);
RocksDBSharedResources rocksDBSharedResources =
new RocksDBSharedResources(cache, wbm, writeBufferSize, false);
return new OpaqueMemoryResource<>(
rocksDBSharedResources, cacheSize, rocksDBSharedResources::close);
}
private Cache getBlockCache(ColumnFamilyOptions columnOptions) {
BlockBasedTableConfig blockBasedTableConfig = null;
try {
blockBasedTableConfig = (BlockBasedTableConfig) columnOptions.tableFormatConfig();
} catch (ClassCastException e) {
fail("Table config got from ColumnFamilyOptions is not BlockBasedTableConfig");
}
Field cacheField = null;
try {
cacheField = BlockBasedTableConfig.class.getDeclaredField("blockCache_");
} catch (NoSuchFieldException e) {
fail("blockCache_ is not defined");
}
cacheField.setAccessible(true);
try {
return (Cache) cacheField.get(blockBasedTableConfig);
} catch (IllegalAccessException e) {
fail("Cannot access blockCache_ field.");
return null;
}
}
private WriteBufferManager getWriteBufferManager(DBOptions dbOptions) {
Field writeBufferManagerField = null;
try {
writeBufferManagerField = DBOptions.class.getDeclaredField("writeBufferManager_");
} catch (NoSuchFieldException e) {
fail("writeBufferManager_ is not defined.");
}
writeBufferManagerField.setAccessible(true);
try {
return (WriteBufferManager) writeBufferManagerField.get(dbOptions);
} catch (IllegalAccessException e) {
fail("Cannot access writeBufferManager_ field.");
return null;
}
}
@Test
public void testFreeColumnOptionsAfterClose() throws Exception {
RocksDBResourceContainer container = new RocksDBResourceContainer();
ColumnFamilyOptions columnFamilyOptions = container.getColumnOptions();
assertThat(columnFamilyOptions.isOwningHandle(), is(true));
container.close();
assertThat(columnFamilyOptions.isOwningHandle(), is(false));
}
@Test
public void testFreeMultipleColumnOptionsAfterClose() throws Exception {
RocksDBResourceContainer container = new RocksDBResourceContainer();
final int optionNumber = 20;
ArrayList<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(optionNumber);
for (int i = 0; i < optionNumber; i++) {
columnFamilyOptions.add(container.getColumnOptions());
}
container.close();
for (ColumnFamilyOptions columnFamilyOption : columnFamilyOptions) {
assertThat(columnFamilyOption.isOwningHandle(), is(false));
}
}
@Test
public void testFreeMultipleColumnOptionsWithPredefinedOptions() throws Exception {
for (PredefinedOptions predefinedOptions : PredefinedOptions.values()) {
RocksDBResourceContainer container =
new RocksDBResourceContainer(predefinedOptions, null);
final int optionNumber = 20;
ArrayList<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(optionNumber);
for (int i = 0; i < optionNumber; i++) {
columnFamilyOptions.add(container.getColumnOptions());
}
container.close();
for (ColumnFamilyOptions columnFamilyOption : columnFamilyOptions) {
assertThat(columnFamilyOption.isOwningHandle(), is(false));
}
}
}
@Test
public void testFreeSharedResourcesAfterClose() throws Exception {
LRUCache cache = new LRUCache(1024L);
WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
RocksDBSharedResources sharedResources =
new RocksDBSharedResources(cache, wbm, 1024L, false);
final ThrowingRunnable<Exception> disposer = sharedResources::close;
OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
RocksDBResourceContainer container =
new RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, opaqueResource);
container.close();
assertThat(cache.isOwningHandle(), is(false));
assertThat(wbm.isOwningHandle(), is(false));
}
@Test
public void testFreeWriteReadOptionsAfterClose() throws Exception {
RocksDBResourceContainer container = new RocksDBResourceContainer();
WriteOptions writeOptions = container.getWriteOptions();
ReadOptions readOptions = container.getReadOptions();
assertThat(writeOptions.isOwningHandle(), is(true));
assertThat(readOptions.isOwningHandle(), is(true));
container.close();
assertThat(writeOptions.isOwningHandle(), is(false));
assertThat(readOptions.isOwningHandle(), is(false));
}
@Test
public void testGetColumnFamilyOptionsWithPartitionedIndex() throws Exception {
LRUCache cache = new LRUCache(1024L);
WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
RocksDBSharedResources sharedResources =
new RocksDBSharedResources(cache, wbm, 1024L, true);
final ThrowingRunnable<Exception> disposer = sharedResources::close;
OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
BloomFilter blockBasedFilter = new BloomFilter();
RocksDBOptionsFactory blockBasedBloomFilterOptionFactory =
new RocksDBOptionsFactory() {
@Override
public DBOptions createDBOptions(
DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions;
}
@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
TableFormatConfig tableFormatConfig = currentOptions.tableFormatConfig();
BlockBasedTableConfig blockBasedTableConfig =
tableFormatConfig == null
? new BlockBasedTableConfig()
: (BlockBasedTableConfig) tableFormatConfig;
blockBasedTableConfig.setFilter(blockBasedFilter);
handlesToClose.add(blockBasedFilter);
currentOptions.setTableFormatConfig(blockBasedTableConfig);
return currentOptions;
}
};
try (RocksDBResourceContainer container =
new RocksDBResourceContainer(
PredefinedOptions.DEFAULT,
blockBasedBloomFilterOptionFactory,
opaqueResource)) {
ColumnFamilyOptions columnOptions = container.getColumnOptions();
BlockBasedTableConfig actual =
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
assertThat(actual.indexType(), is(IndexType.kTwoLevelIndexSearch));
assertThat(actual.partitionFilters(), is(true));
assertThat(actual.pinTopLevelIndexAndFilter(), is(true));
assertThat(getFilterFromBlockBasedTableConfig(actual), not(blockBasedFilter));
}
assertFalse("Block based filter is left unclosed.", blockBasedFilter.isOwningHandle());
}
}