blob: abe182b6da18c9277ba1a84920c10e980ecc6a29 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.db;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static org.apache.ignite.testframework.GridTestUtils.SF;
/**
*
*/
public class CheckpointBufferDeadlockTest extends GridCommonAbstractTest {
/** Max size. */
private static final int MAX_SIZE = 500 * 1024 * 1024;
/** CP buffer size. */
private static final int CP_BUF_SIZE = 20 * 1024 * 1024;
/** Slow checkpoint enabled. */
private static final AtomicBoolean slowCheckpointEnabled = new AtomicBoolean(false);
/** Checkpoint park nanos. */
private static final int CHECKPOINT_PARK_NANOS = 50_000_000;
/** Entry byte chunk size. */
private static final int ENTRY_BYTE_CHUNK_SIZE = 900;
/** Pages touched under CP lock. */
private static final int PAGES_TOUCHED_UNDER_CP_LOCK = 20;
/** Slop load flag. */
private static final AtomicBoolean stop = new AtomicBoolean(false);
/** Checkpoint threads. */
private int checkpointThreads;
/** Test logger. */
private final ListeningTestLogger log = new ListeningTestLogger(false, super.log);
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setDataStorageConfiguration(
new DataStorageConfiguration()
.setFileIOFactory(new SlowCheckpointFileIOFactory(slowCheckpointEnabled, CHECKPOINT_PARK_NANOS))
.setCheckpointThreads(checkpointThreads)
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setMaxSize(MAX_SIZE)
.setCheckpointPageBufferSize(CP_BUF_SIZE)
)
);
cfg.setFailureHandler(new StopNodeFailureHandler());
cfg.setGridLogger(log);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
stop.set(false);
slowCheckpointEnabled.set(false);
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stop.set(true);
slowCheckpointEnabled.set(false);
stopAllGrids();
cleanPersistenceDir();
}
/**
*
*/
@Test
public void testFourCheckpointThreads() throws Exception {
checkpointThreads = 4;
for (int i = 0; i < SF.applyLB(10, 3); i++) {
beforeTest();
try {
runDeadlockScenario();
}
finally {
afterTest();
}
}
}
/**
*
*/
@Test
public void testOneCheckpointThread() throws Exception {
checkpointThreads = 1;
runDeadlockScenario();
}
/**
*
*/
private void runDeadlockScenario() throws Exception {
LogListener lsnr = LogListener.matches(s -> s.contains("AssertionError")).build();
log.registerListener(lsnr);
IgniteEx ig = startGrid(0);
ig.cluster().active(true);
GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)ig.context().cache().context().database();
FilePageStoreManager pageStoreMgr = (FilePageStoreManager)ig.context().cache().context().pageStore();
final String cacheName = "single-part";
CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>()
.setName(cacheName)
.setAffinity(new RendezvousAffinityFunction(false, 1));
IgniteCache<Object, Object> singlePartCache = ig.getOrCreateCache(cacheCfg);
db.enableCheckpoints(false).get();
Thread.sleep(1_000);
try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(singlePartCache.getName())) {
int entries = MAX_SIZE / ENTRY_BYTE_CHUNK_SIZE / 4;
for (int i = 0; i < entries; i++)
streamer.addData(i, new byte[ENTRY_BYTE_CHUNK_SIZE]);
streamer.flush();
}
slowCheckpointEnabled.set(true);
log.info(">>> Slow checkpoints enabled");
db.enableCheckpoints(true).get();
AtomicBoolean fail = new AtomicBoolean(false);
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
int loops = 0;
while (!stop.get()) {
if (loops % 10 == 0 && loops > 0 && loops < 500 || loops % 500 == 0 && loops >= 500)
log.info("Successfully completed " + loops + " loops");
db.checkpointReadLock();
try {
Set<FullPageId> pickedPagesSet = new HashSet<>();
PageStore store = pageStoreMgr.getStore(CU.cacheId(cacheName), 0);
int pages = store.pages();
DataRegion region = db.dataRegion(DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME);
PageMemoryImpl pageMem = (PageMemoryImpl)region.pageMemory();
while (pickedPagesSet.size() < PAGES_TOUCHED_UNDER_CP_LOCK) {
int pageIdx = ThreadLocalRandom.current().nextInt(
PAGES_TOUCHED_UNDER_CP_LOCK, pages - PAGES_TOUCHED_UNDER_CP_LOCK);
long pageId = PageIdUtils.pageId(0, PageIdAllocator.FLAG_DATA, pageIdx);
long page = pageMem.acquirePage(CU.cacheId(cacheName), pageId);
try {
// We do not know correct flag(FLAG_DATA or FLAG_AUX). Skip page if no luck.
if (pageId != PageIO.getPageId(page + PageMemoryImpl.PAGE_OVERHEAD))
continue;
}
finally {
pageMem.releasePage(CU.cacheId(cacheName), pageId, page);
}
pickedPagesSet.add(new FullPageId(pageId, CU.cacheId(cacheName)));
}
List<FullPageId> pickedPages = new ArrayList<>(pickedPagesSet);
assertEquals(PAGES_TOUCHED_UNDER_CP_LOCK, pickedPages.size());
// Sort to avoid deadlocks on pages rw-locks.
pickedPages.sort(new Comparator<FullPageId>() {
@Override public int compare(FullPageId o1, FullPageId o2) {
int cmp = Long.compare(o1.groupId(), o2.groupId());
if (cmp != 0)
return cmp;
return Long.compare(o1.effectivePageId(), o2.effectivePageId());
}
});
List<Long> readLockedPages = new ArrayList<>();
// Read lock many pages at once intentionally.
for (int i = 0; i < PAGES_TOUCHED_UNDER_CP_LOCK / 2; i++) {
FullPageId fpid = pickedPages.get(i);
long page = pageMem.acquirePage(fpid.groupId(), fpid.pageId());
long abs = pageMem.readLock(fpid.groupId(), fpid.pageId(), page);
assertFalse(fpid.toString(), abs == 0);
readLockedPages.add(page);
}
// Emulate writes to trigger throttling.
for (int i = PAGES_TOUCHED_UNDER_CP_LOCK / 2; i < PAGES_TOUCHED_UNDER_CP_LOCK && !stop.get(); i++) {
FullPageId fpid = pickedPages.get(i);
long page = pageMem.acquirePage(fpid.groupId(), fpid.pageId());
long abs = pageMem.writeLock(fpid.groupId(), fpid.pageId(), page);
assertFalse(fpid.toString(), abs == 0);
pageMem.writeUnlock(fpid.groupId(), fpid.pageId(), page, null, true);
pageMem.releasePage(fpid.groupId(), fpid.pageId(), page);
}
for (int i = 0; i < PAGES_TOUCHED_UNDER_CP_LOCK / 2; i++) {
FullPageId fpid = pickedPages.get(i);
pageMem.readUnlock(fpid.groupId(), fpid.pageId(), readLockedPages.get(i));
pageMem.releasePage(fpid.groupId(), fpid.pageId(), readLockedPages.get(i));
}
}
catch (Throwable e) {
log.error("Error in loader thread", e);
fail.set(true);
}
finally {
db.checkpointReadUnlock();
}
loops++;
}
}
}, 10, "load-runner");
Thread.sleep(10_000); // Await for the start of throttling.
slowCheckpointEnabled.set(false);
log.info(">>> Slow checkpoints disabled");
assertFalse(fail.get());
forceCheckpoint(); // Previous checkpoint should eventually finish.
stop.set(true);
fut.get();
db.enableCheckpoints(true).get();
//check that there is no problem with pinned pages
ig.destroyCache(cacheName);
assertFalse(lsnr.check());
log.unregisterListener(lsnr);
}
}