blob: 12b33e1fdfc0a43a35c0743d5412f62dc83bd016 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.contrib.streaming.state;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.rocksdb.Cache;
import org.rocksdb.LRUCache;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.WriteBufferManager;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.powermock.api.mockito.PowerMockito.when;
/** Tests to guard {@link RocksDBMemoryControllerUtils}. */
@RunWith(PowerMockRunner.class)
@PrepareForTest(RocksDBMemoryControllerUtils.class)
public class RocksDBMemoryControllerUtilsTest {
@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();
@Before
public void ensureRocksDbNativeLibraryLoaded() throws IOException {
NativeLibraryLoader.getInstance()
.loadLibrary(temporaryFolder.newFolder().getAbsolutePath());
}
@Test
public void testCreateSharedResourcesWithExpectedCapacity() {
PowerMockito.mockStatic(RocksDBMemoryControllerUtils.class);
final AtomicLong actualCacheCapacity = new AtomicLong(0L);
final AtomicLong actualWbmCapacity = new AtomicLong(0L);
when(RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(
anyLong(), anyDouble(), anyDouble(), anyBoolean()))
.thenCallRealMethod();
when(RocksDBMemoryControllerUtils.calculateActualCacheCapacity(anyLong(), anyDouble()))
.thenCallRealMethod();
when(RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity(
anyLong(), anyDouble()))
.thenCallRealMethod();
// because PowerMockito cannot mock on native static method easily,
// we introduce `createCache` and `createWriteBufferManager` wrappers here.
when(RocksDBMemoryControllerUtils.createCache(anyLong(), anyDouble()))
.thenAnswer(
(Answer<LRUCache>)
invocation -> {
Object[] arguments = invocation.getArguments();
actualCacheCapacity.set((long) arguments[0]);
return (LRUCache) invocation.callRealMethod();
});
when(RocksDBMemoryControllerUtils.createWriteBufferManager(anyLong(), any(Cache.class)))
.thenAnswer(
(Answer<WriteBufferManager>)
invocation -> {
Object[] arguments = invocation.getArguments();
actualWbmCapacity.set((long) arguments[0]);
return (WriteBufferManager) invocation.callRealMethod();
});
long totalMemorySize = 2048L;
double writeBufferRatio = 0.5;
double highPriPoolRatio = 0.1;
RocksDBSharedResources rocksDBSharedResources =
RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(
totalMemorySize, writeBufferRatio, highPriPoolRatio, false);
long expectedCacheCapacity =
RocksDBMemoryControllerUtils.calculateActualCacheCapacity(
totalMemorySize, writeBufferRatio);
long expectedWbmCapacity =
RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity(
totalMemorySize, writeBufferRatio);
assertThat(actualCacheCapacity.get(), is(expectedCacheCapacity));
assertThat(actualWbmCapacity.get(), is(expectedWbmCapacity));
assertThat(rocksDBSharedResources.getWriteBufferManagerCapacity(), is(expectedWbmCapacity));
}
@Test
public void testCalculateRocksDBDefaultArenaBlockSize() {
final long align = 4 * 1024;
final long writeBufferSize = 64 * 1024 * 1024;
final long expectArenaBlockSize = writeBufferSize / 8;
// Normal case test
assertThat(
"Arena block size calculation error for normal case",
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(writeBufferSize),
is(expectArenaBlockSize));
// Alignment tests
assertThat(
"Arena block size calculation error for alignment case",
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(
writeBufferSize - 1),
is(expectArenaBlockSize));
assertThat(
"Arena block size calculation error for alignment case2",
RocksDBMemoryControllerUtils.calculateRocksDBDefaultArenaBlockSize(
writeBufferSize + 8),
is(expectArenaBlockSize + align));
}
@Test
public void testCalculateRocksDBMutableLimit() {
long bufferSize = 64 * 1024 * 1024;
long limit = bufferSize * 7 / 8;
assertThat(
RocksDBMemoryControllerUtils.calculateRocksDBMutableLimit(bufferSize), is(limit));
}
@Test
public void testValidateArenaBlockSize() {
long arenaBlockSize = 8 * 1024 * 1024;
assertFalse(
RocksDBMemoryControllerUtils.validateArenaBlockSize(
arenaBlockSize, (long) (arenaBlockSize * 0.5)));
assertTrue(
RocksDBMemoryControllerUtils.validateArenaBlockSize(
arenaBlockSize, (long) (arenaBlockSize * 1.5)));
}
}