blob: a76282126b4d0502fcff401d1d76ff0fb2daa89d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCountDownLatch;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.AbstractFailureHandler;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
/**
* Test handle of task canceling with PDS enabled.
*/
public class IgnitePdsTaskCancelingTest extends GridCommonAbstractTest {
/** Slow file IO enabled. */
private static final AtomicBoolean slowFileIoEnabled = new AtomicBoolean(false);
/** Node failure occurs. */
private static final AtomicBoolean failure = new AtomicBoolean(false);
/** Number of executing tasks. */
private static final int NUM_TASKS = 16;
/** Page size. */
private static final int PAGE_SIZE = 2048;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setFailureHandler(new AbstractFailureHandler() {
@Override protected boolean handle(Ignite ignite, FailureContext failureCtx) {
failure.set(true);
return true;
}
});
cfg.setCacheConfiguration(new CacheConfiguration().setName(DEFAULT_CACHE_NAME).setAffinity(
new RendezvousAffinityFunction(false, NUM_TASKS / 2)
));
cfg.setDataStorageConfiguration(getDataStorageConfiguration());
// Set the thread pool size according to the NUM_TASKS.
cfg.setPublicThreadPoolSize(16);
return cfg;
}
/**
* Default data storage configuration.
*/
private DataStorageConfiguration getDataStorageConfiguration() {
DataStorageConfiguration dbCfg = new DataStorageConfiguration();
dbCfg.setPageSize(PAGE_SIZE);
dbCfg.setFileIOFactory(new SlowIOFactory());
dbCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(100 * 1024 * 1024)
.setPersistenceEnabled(true));
return dbCfg;
}
/**
* Checks that tasks canceling does not lead to node failure.
*/
@Test
public void testFailNodesOnCanceledTask() throws Exception {
cleanPersistenceDir();
failure.set(false);
try {
Ignite ig0 = startGrids(4);
ig0.cluster().active(true);
Collection<IgniteFuture> cancelFutures = new ArrayList<>(NUM_TASKS);
IgniteCountDownLatch latch = ig0.countDownLatch("latch", NUM_TASKS, false, true);
for (int i = 0; i < NUM_TASKS; i++) {
final Integer key = i;
cancelFutures.add(ig0.compute().affinityRunAsync(DEFAULT_CACHE_NAME, key,
new IgniteRunnable() {
@IgniteInstanceResource
Ignite ig;
@Override public void run() {
latch.countDown();
latch.await();
ig.cache(DEFAULT_CACHE_NAME).put(key, new byte[1024]);
}
}));
}
slowFileIoEnabled.set(true);
latch.await();
for (IgniteFuture future: cancelFutures)
future.cancel();
slowFileIoEnabled.set(false);
for (int i = 0; i < NUM_TASKS; i++) {
final Integer key = i;
ig0.compute().affinityRun(DEFAULT_CACHE_NAME, key,
new IgniteRunnable() {
@IgniteInstanceResource
Ignite ig;
@Override public void run() {
ig.cache(DEFAULT_CACHE_NAME).put(key, new byte[1024]);
}
});
}
assertFalse(GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return failure.get();
}
}, 5_000L));
}
finally {
stopAllGrids();
cleanPersistenceDir();
}
}
/**
* Test FilePageStore with multiple interrupted threads.
*/
@Test
public void testFilePageStoreInterruptThreads() throws Exception {
failure.set(false);
FileIOFactory factory = new RandomAccessFileIOFactory();
File file = new File(U.defaultWorkDirectory(), "file.bin");
file.deleteOnExit();
DataStorageConfiguration dbCfg = getDataStorageConfiguration();
FilePageStore pageStore = new FilePageStore(PageMemory.FLAG_DATA, file::toPath, factory, dbCfg.getPageSize(), val -> {});
int pageSize = dbCfg.getPageSize();
PageIO pageIO = PageIO.getPageIO(PageIO.T_DATA, 1);
long ptr = GridUnsafe.allocateMemory(NUM_TASKS * pageSize);
try {
List<Thread> threadList = new ArrayList<>(NUM_TASKS);
AtomicBoolean stopThreads = new AtomicBoolean(false);
for (int i = 0; i < NUM_TASKS; i++) {
long pageId = PageIdUtils.pageId(0, PageMemory.FLAG_DATA, (int)pageStore.allocatePage());
long pageAdr = ptr + i * pageSize;
pageIO.initNewPage(pageAdr, pageId, pageSize, null);
ByteBuffer buf = GridUnsafe.wrapPointer(pageAdr, pageSize);
pageStore.write(pageId, buf, 0, true);
threadList.add(new Thread(new Runnable() {
@Override public void run() {
Random random = new Random();
while (!stopThreads.get()) {
buf.position(0);
try {
if (random.nextBoolean()) {
log.info(">>> Read page " + U.hexLong(pageId));
pageStore.read(pageId, buf, false);
}
else {
log.info(">>> Write page " + U.hexLong(pageId));
pageStore.write(pageId, buf, 0, true);
}
Thread.interrupted();
}
catch (Exception e) {
log.error("Error while reading/writing page", e);
failure.set(true);
}
}
}
}));
}
for (Thread thread : threadList)
thread.start();
for (int i = 0; i < 10; i++) {
for (Thread thread : threadList) {
doSleep(10L);
log.info("Interrupting " + thread.getName());
thread.interrupt();
}
}
stopThreads.set(true);
for (Thread thread : threadList)
thread.join();
assertFalse(failure.get());
}
finally {
GridUnsafe.freeMemory(ptr);
}
}
/**
* Decorated FileIOFactory with slow IO operations.
*/
private static class SlowIOFactory implements FileIOFactory {
/** */
private static final long serialVersionUID = 0L;
/** */
private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory();
/** {@inheritDoc} */
@Override public FileIO create(File file, OpenOption... openOption) throws IOException {
final FileIO delegate = delegateFactory.create(file, openOption);
final boolean slow = file.getName().contains(".bin");
return new FileIODecorator(delegate) {
@Override public int write(ByteBuffer srcBuf) throws IOException {
parkForAWhile();
return super.write(srcBuf);
}
@Override public int write(ByteBuffer srcBuf, long position) throws IOException {
parkForAWhile();
return super.write(srcBuf, position);
}
@Override public int write(byte[] buf, int off, int len) throws IOException {
parkForAWhile();
return super.write(buf, off, len);
}
@Override public int read(ByteBuffer destBuf) throws IOException {
parkForAWhile();
return super.read(destBuf);
}
@Override public int read(ByteBuffer destBuf, long position) throws IOException {
parkForAWhile();
return super.read(destBuf, position);
}
@Override public int read(byte[] buf, int off, int len) throws IOException {
parkForAWhile();
return super.read(buf, off, len);
}
private void parkForAWhile() {
if (slowFileIoEnabled.get() && slow)
LockSupport.parkNanos(1_000_000_000L);
}
};
}
}
}