blob: af3fbc0ec45158d2fa9d61ea449d89171ff623a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.pagememory.persistence;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.MUST_TRIGGER;
import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.NOT_REQUIRED;
import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.SHOULD_TRIGGER;
import static org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory.PAGE_OVERHEAD;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.PAGES_SORTED;
import static org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTestUtils.mockCheckpointTimeoutLock;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.Constants.MiB;
import static org.apache.ignite.internal.util.GridUnsafe.allocateBuffer;
import static org.apache.ignite.internal.util.GridUnsafe.freeBuffer;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.LongStream;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
import org.apache.ignite.internal.pagememory.AbstractPageMemoryNoLoadSelfTest;
import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
import org.apache.ignite.internal.pagememory.configuration.schema.PersistentPageMemoryDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.PartitionMeta.PartitionMetaSnapshot;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointManager;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointProgress;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
/**
* Tests {@link PersistentPageMemory}.
*/
@ExtendWith({WorkDirectoryExtension.class, ConfigurationExtension.class})
public class PersistentPageMemoryNoLoadTest extends AbstractPageMemoryNoLoadSelfTest {
private static PageIoRegistry ioRegistry;
@InjectConfiguration
private PersistentPageMemoryDataRegionConfiguration dataRegionCfg;
@BeforeAll
static void beforeAll() {
ioRegistry = new PageIoRegistry();
ioRegistry.loadFromServiceLoader();
}
@BeforeEach
void setUp() throws Exception {
dataRegionCfg.change(c -> c.changeSize(MAX_MEMORY_SIZE)).get(1, SECONDS);
}
@AfterAll
static void afterAll() {
ioRegistry = null;
}
/** {@inheritDoc} */
@Override
protected PageMemory memory() {
return createPageMemory(
defaultSegmentSizes(),
defaultCheckpointBufferSize(),
null,
null,
shouldNotHappenFlushDirtyPageForReplacement()
);
}
/** {@inheritDoc} */
@Test
@Override
public void testPageHandleDeallocation() {
// No-op.
}
@Test
void testDirtyPages(
@InjectConfiguration PageMemoryCheckpointConfiguration checkpointConfig,
@WorkDirectory Path workDir
) throws Exception {
FilePageStoreManager filePageStoreManager = createFilePageStoreManager(workDir);
PartitionMetaManager partitionMetaManager = new PartitionMetaManager(ioRegistry, PAGE_SIZE);
Collection<DataRegion<PersistentPageMemory>> dataRegions = new ArrayList<>();
CheckpointManager checkpointManager = createCheckpointManager(
checkpointConfig,
filePageStoreManager,
partitionMetaManager,
dataRegions
);
PersistentPageMemory pageMemory = createPageMemory(
defaultSegmentSizes(),
defaultCheckpointBufferSize(),
filePageStoreManager,
checkpointManager,
shouldNotHappenFlushDirtyPageForReplacement()
);
dataRegions.add(() -> pageMemory);
filePageStoreManager.start();
checkpointManager.start();
pageMemory.start();
try {
initGroupFilePageStores(filePageStoreManager, partitionMetaManager, checkpointManager);
checkpointManager.checkpointTimeoutLock().checkpointReadLock();
try {
Set<FullPageId> dirtyPages = Set.of(createDirtyPage(pageMemory), createDirtyPage(pageMemory));
assertThat(pageMemory.dirtyPages(), equalTo(dirtyPages));
} finally {
checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
}
checkpointManager
.forceCheckpoint("for_test_flash_dirty_pages")
.futureFor(FINISHED)
.get(1, SECONDS);
assertThat(pageMemory.dirtyPages(), empty());
} finally {
closeAll(
() -> pageMemory.stop(true),
checkpointManager::stop,
filePageStoreManager::stop
);
}
}
@Test
void testCheckpointUrgency(
@InjectConfiguration PageMemoryCheckpointConfiguration checkpointConfig,
@WorkDirectory Path workDir
) throws Exception {
FilePageStoreManager filePageStoreManager = createFilePageStoreManager(workDir);
PartitionMetaManager partitionMetaManager = new PartitionMetaManager(ioRegistry, PAGE_SIZE);
Collection<DataRegion<PersistentPageMemory>> dataRegions = new ArrayList<>();
CheckpointManager checkpointManager = createCheckpointManager(
checkpointConfig,
filePageStoreManager,
partitionMetaManager,
dataRegions
);
long systemPageSize = PAGE_SIZE + PAGE_OVERHEAD;
dataRegionCfg.change(c -> c.changeSize(128 * systemPageSize)).get(1, SECONDS);
PersistentPageMemory pageMemory = createPageMemory(
new long[]{100 * systemPageSize},
28 * systemPageSize,
filePageStoreManager,
checkpointManager,
shouldNotHappenFlushDirtyPageForReplacement()
);
dataRegions.add(() -> pageMemory);
filePageStoreManager.start();
checkpointManager.start();
pageMemory.start();
try {
initGroupFilePageStores(filePageStoreManager, partitionMetaManager, checkpointManager);
long maxPages = pageMemory.totalPages();
long dirtyPagesSoftThreshold = (maxPages * 3 / 4);
long dirtyPagesHardThreshold = (maxPages * 9 / 10);
assertThat(dirtyPagesSoftThreshold, greaterThanOrEqualTo(70L));
assertThat(dirtyPagesHardThreshold, greaterThanOrEqualTo(80L));
checkpointManager.checkpointTimeoutLock().checkpointReadLock();
try {
int i = 0;
for (; i < dirtyPagesSoftThreshold - 1; i++) {
createDirtyPage(pageMemory);
assertEquals(NOT_REQUIRED, pageMemory.checkpointUrgency(), "i=" + i);
}
for (; i < dirtyPagesHardThreshold - 1; i++) {
createDirtyPage(pageMemory);
assertEquals(SHOULD_TRIGGER, pageMemory.checkpointUrgency(), "i=" + i);
}
for (; i < maxPages; i++) {
createDirtyPage(pageMemory);
assertEquals(MUST_TRIGGER, pageMemory.checkpointUrgency(), "i=" + i);
}
} finally {
checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
}
checkpointManager
.forceCheckpoint("for_test_safe_to_update")
.futureFor(FINISHED)
.get(1, SECONDS);
assertEquals(NOT_REQUIRED, pageMemory.checkpointUrgency());
} finally {
closeAll(
() -> pageMemory.stop(true),
checkpointManager::stop,
filePageStoreManager::stop
);
}
}
@Test
void testDeltaFilePageStore(
@InjectConfiguration PageMemoryCheckpointConfiguration checkpointConfig,
@WorkDirectory Path workDir
) throws Exception {
FilePageStoreManager filePageStoreManager = spy(createFilePageStoreManager(workDir));
PartitionMetaManager partitionMetaManager = new PartitionMetaManager(ioRegistry, PAGE_SIZE);
Collection<DataRegion<PersistentPageMemory>> dataRegions = new ArrayList<>();
CheckpointManager checkpointManager = createCheckpointManager(
checkpointConfig,
filePageStoreManager,
partitionMetaManager,
dataRegions
);
PersistentPageMemory pageMemory = createPageMemory(
defaultSegmentSizes(),
defaultCheckpointBufferSize(),
filePageStoreManager,
checkpointManager,
shouldNotHappenFlushDirtyPageForReplacement()
);
dataRegions.add(() -> pageMemory);
filePageStoreManager.start();
checkpointManager.start();
pageMemory.start();
try {
initGroupFilePageStores(filePageStoreManager, partitionMetaManager, checkpointManager);
checkpointManager.checkpointTimeoutLock().checkpointReadLock();
try {
createDirtyPage(pageMemory);
createDirtyPage(pageMemory);
createDirtyPage(pageMemory);
} finally {
checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
}
checkpointManager
.forceCheckpoint("for_test_delta_file_page_store")
.futureFor(FINISHED)
.get(1, SECONDS);
verify(filePageStoreManager, times(1)).tmpDeltaFilePageStorePath(eq(GRP_ID), eq(PARTITION_ID), eq(0));
verify(filePageStoreManager, times(1)).deltaFilePageStorePath(eq(GRP_ID), eq(PARTITION_ID), eq(0));
} finally {
closeAll(
() -> pageMemory.stop(true),
checkpointManager::stop,
filePageStoreManager::stop
);
}
}
@Test
void testPageReplacement(
@InjectConfiguration("mock.checkpointThreads=1") PageMemoryCheckpointConfiguration checkpointConfig,
@WorkDirectory Path workDir
) throws Exception {
FilePageStoreManager filePageStoreManager = createFilePageStoreManager(workDir);
PartitionMetaManager partitionMetaManager = spy(new PartitionMetaManager(ioRegistry, PAGE_SIZE));
Collection<DataRegion<PersistentPageMemory>> dataRegions = new ArrayList<>();
CheckpointManager checkpointManager = createCheckpointManager(
checkpointConfig,
filePageStoreManager,
partitionMetaManager,
dataRegions
);
CompletableFuture<?> flushDirtyPageForReplacementFuture = new CompletableFuture<>();
PersistentPageMemory pageMemory = createPageMemory(
defaultSegmentSizes(),
defaultCheckpointBufferSize(),
filePageStoreManager,
checkpointManager,
(pageMemory0, fullPageId, buffer) -> flushDirtyPageForReplacementFuture.complete(null)
);
dataRegions.add(() -> pageMemory);
filePageStoreManager.start();
checkpointManager.start();
pageMemory.start();
CompletableFuture<?> startWriteMetaToBufferFuture = new CompletableFuture<>();
CompletableFuture<?> finishWaitWriteMetaToBufferFuture = new CompletableFuture<>();
// Mock to pause writing to the disk (complete the checkpoint) and the replacement could happen.
doAnswer(answer -> {
startWriteMetaToBufferFuture.complete(null);
assertThat(finishWaitWriteMetaToBufferFuture, willCompleteSuccessfully());
return answer.callRealMethod();
})
.when(partitionMetaManager)
.writeMetaToBuffer(any(GroupPartitionId.class), any(PartitionMetaSnapshot.class), any(ByteBuffer.class));
try {
initGroupFilePageStores(filePageStoreManager, partitionMetaManager, checkpointManager);
checkpointManager.checkpointTimeoutLock().checkpointReadLock();
try {
for (int i = 0; i < 1_000; i++) {
createDirtyPage(pageMemory);
}
} finally {
checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
}
CheckpointProgress checkpointProgress = checkpointManager.forceCheckpoint("for_test_page_replacement");
// Replacement will not happen until the pages are sorted.
checkpointProgress.futureFor(PAGES_SORTED).get(1, SECONDS);
checkpointManager.checkpointTimeoutLock().checkpointReadLock();
try {
// We are waiting for the start of writing dirty pages to disk.
startWriteMetaToBufferFuture.get(1, SECONDS);
do {
// We create new dirty pages so that we get to the end of the data region and start page replacing.
createDirtyPage(pageMemory);
} while (!flushDirtyPageForReplacementFuture.isDone());
// Let's write the dirty pages to disk and complete the checkpoint.
finishWaitWriteMetaToBufferFuture.complete(null);
} finally {
checkpointManager.checkpointTimeoutLock().checkpointReadUnlock();
}
checkpointProgress.futureFor(FINISHED).get(1, SECONDS);
} finally {
finishWaitWriteMetaToBufferFuture.complete(null);
closeAll(
() -> pageMemory.stop(true),
checkpointManager::stop,
filePageStoreManager::stop
);
}
}
protected PersistentPageMemory createPageMemory(
long[] segmentSizes,
long checkpointBufferSize,
@Nullable FilePageStoreManager filePageStoreManager,
@Nullable CheckpointManager checkpointManager,
WriteDirtyPage flushDirtyPageForReplacement
) {
return new PersistentPageMemory(
dataRegionCfg,
ioRegistry,
segmentSizes,
checkpointBufferSize,
filePageStoreManager == null ? new TestPageReadWriteManager() : filePageStoreManager,
null,
flushDirtyPageForReplacement,
checkpointManager == null ? mockCheckpointTimeoutLock(true) : checkpointManager.checkpointTimeoutLock(),
PAGE_SIZE
);
}
protected FullPageId createDirtyPage(PersistentPageMemory pageMemory) throws Exception {
FullPageId fullPageId = allocatePage(pageMemory);
long page = pageMemory.acquirePage(fullPageId.groupId(), fullPageId.pageId());
try {
writePage(pageMemory, fullPageId, page, 100);
} finally {
pageMemory.releasePage(fullPageId.groupId(), fullPageId.pageId(), page);
}
return fullPageId;
}
private static long[] defaultSegmentSizes() {
return LongStream.range(0, 9).map(i -> 5 * MiB).toArray();
}
private static long defaultCheckpointBufferSize() {
return 5 * MiB;
}
private static WriteDirtyPage shouldNotHappenFlushDirtyPageForReplacement() {
return (fullPageId, buf, tag) -> fail("Should not happen");
}
private static CheckpointManager createCheckpointManager(
PageMemoryCheckpointConfiguration checkpointConfig,
FilePageStoreManager filePageStoreManager,
PartitionMetaManager partitionMetaManager,
Collection<DataRegion<PersistentPageMemory>> dataRegions
) throws Exception {
return new CheckpointManager(
"test",
null,
null,
mock(FailureProcessor.class),
checkpointConfig,
filePageStoreManager,
partitionMetaManager,
dataRegions,
ioRegistry,
mock(LogSyncer.class),
PAGE_SIZE
);
}
private static FilePageStoreManager createFilePageStoreManager(Path storagePath) throws Exception {
return new FilePageStoreManager(
"test",
storagePath,
new RandomAccessFileIoFactory(),
PAGE_SIZE,
mock(FailureProcessor.class));
}
private static void initGroupFilePageStores(
FilePageStoreManager filePageStoreManager,
PartitionMetaManager partitionMetaManager,
CheckpointManager checkpointManager
) throws Exception {
int partitions = PARTITION_ID + 1;
checkpointManager.checkpointTimeoutLock().checkpointReadLock();
ByteBuffer buffer = null;
try {
buffer = allocateBuffer(PAGE_SIZE);
for (int partition = 0; partition < partitions; partition++) {
GroupPartitionId groupPartitionId = new GroupPartitionId(GRP_ID, partition);
FilePageStore filePageStore = filePageStoreManager.readOrCreateStore(groupPartitionId, buffer.rewind());
filePageStore.ensure();
CheckpointProgress lastCheckpointProgress = checkpointManager.lastCheckpointProgress();
PartitionMeta partitionMeta = partitionMetaManager.readOrCreateMeta(
lastCheckpointProgress == null ? null : lastCheckpointProgress.id(),
groupPartitionId,
filePageStore,
buffer.rewind()
);
filePageStore.setPageAllocationListener(pageIdx -> {
assert checkpointManager.checkpointTimeoutLock().checkpointLockIsHeldByThread();
CheckpointProgress last = checkpointManager.lastCheckpointProgress();
partitionMeta.incrementPageCount(last == null ? null : last.id());
});
filePageStore.pages(partitionMeta.pageCount());
filePageStoreManager.addStore(groupPartitionId, filePageStore);
partitionMetaManager.addMeta(groupPartitionId, partitionMeta);
}
} finally {
ByteBuffer bufferToClose = buffer;
closeAll(
bufferToClose == null ? null : () -> freeBuffer(bufferToClose),
() -> checkpointManager.checkpointTimeoutLock().checkpointReadUnlock()
);
}
}
}