blob: 1ecdcdc5858621521008e0c111648d270fe7a149 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.OpenOption;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.MessageOrderLogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
/**
* Test async LFS cleanup during non-BLT node join.
*/
public class CleanupRestoredCachesSlowTest extends GridCommonAbstractTest implements Serializable {
/** */
private static class FilePageStoreManagerChild extends FilePageStoreManager {
/** */
static class LongOperationAsyncExecutorChild extends LongOperationAsyncExecutor {
/** */
public LongOperationAsyncExecutorChild(String igniteInstanceName, IgniteLogger log) {
super(igniteInstanceName, log);
}
}
/**
* @param ctx Kernal context.
*/
public FilePageStoreManagerChild(GridKernalContext ctx) {
super(ctx);
}
}
/**
* FileIO factory, creating {@link SlowFileIO}
*/
private static class SlowFileIOFactory implements FileIOFactory {
/** */
private final AsyncFileIOFactory asyncFileIOFactory = new AsyncFileIOFactory();
/** {@inheritDoc} */
@Override public FileIO create(File file, OpenOption... modes) throws IOException {
FileIO delegate = asyncFileIOFactory.create(file, modes);
return new SlowFileIO(delegate);
}
}
/**
* Slow FileIO, adding delay for some methods.
*/
private static class SlowFileIO extends FileIODecorator implements Serializable {
/**
* @param delegate File I/O delegate
*/
SlowFileIO(FileIO delegate) {
super(delegate);
}
/**
* Slow close method.
*
* @throws IOException if super method failed.
*/
@Override public void close() throws IOException {
doSleep(100);
super.close();
}
}
/** */
private static final String CACHE_NAME = "myCache";
/** */
private final LogListener logLsnr = new MessageOrderLogListener(
"Cache stores cleanup started asynchronously",
"Cleanup cache stores .*? cleanFiles=true\\]"
);
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
ListeningTestLogger testLog =
new ListeningTestLogger(super.getConfiguration(igniteInstanceName).getGridLogger());
testLog.registerListener(logLsnr);
return super.getConfiguration(igniteInstanceName)
.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setPersistenceEnabled(true)
.setInitialSize(100 * 1024L * 1024L)
.setMaxSize(1024 * 1024L * 1024L)
.setMetricsEnabled(true)
)
.setFileIOFactory(new SlowFileIOFactory())
)
.setCacheConfiguration(
new CacheConfiguration()
.setName(CACHE_NAME)
.setBackups(1)
)
.setGridLogger(testLog);
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
super.afterTest();
}
/**
*
* @throws Exception if failed.
*/
@Test
public void testCleanupSlow() throws Exception {
Ignite ignite = startGrids(2);
ignite.cluster().baselineAutoAdjustEnabled(false);
ClusterNode cn0 = grid(0).cluster().localNode();
ClusterNode cn1 = grid(1).cluster().localNode();
ignite.cluster().active(true);
ignite.cluster().baselineAutoAdjustEnabled(false);
ignite.cluster().setBaselineTopology(asList(cn0, cn1));
IgniteCache<Object, Object> cache = ignite.getOrCreateCache(CACHE_NAME);
for (int i = 0; i < 50; i++)
cache.put(i, new byte[1024]);
stopGrid(1);
ignite.cluster().setBaselineTopology(singletonList(cn0));
startGrid(1);
assertTrue(logLsnr.check());
}
/**
*
* @throws Throwable if failed.
*/
@Test
public void testLongOperationAsyncExecutor() throws Throwable {
FilePageStoreManagerChild.LongOperationAsyncExecutorChild executor =
new FilePageStoreManagerChild.LongOperationAsyncExecutorChild("test", new NullLogger());
final AtomicInteger ai = new AtomicInteger(1);
AtomicReference<Throwable> throwable = new AtomicReference<>();
for (int i = 0; i < 1000; i++) {
executor.async(() -> {
ai.set(0);
doSleep(3);
try {
assertEquals(0, ai.get());
}
catch (AssertionError e) {
throwable.set(e);
}
ai.set(1);
});
executor.afterAsyncCompletion(() -> {
assertEquals(1, ai.get());
return null;
});
}
if (throwable.get() != null)
throw throwable.get();
}
}