| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.ignite.internal.processors.cache.persistence; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.file.OpenOption; |
| import java.util.List; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteDataStreamer; |
| import org.apache.ignite.cache.CacheWriteSynchronizationMode; |
| import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.DataRegionConfiguration; |
| import org.apache.ignite.configuration.DataStorageConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.WALMode; |
| import org.apache.ignite.failure.FailureHandler; |
| import org.apache.ignite.failure.StopNodeFailureHandler; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; |
| import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; |
| |
| /** |
| * Test class to check that partition files after eviction are destroyed correctly on next checkpoint or crash |
| * recovery. |
| */ |
| public class IgnitePdsPartitionFilesDestroyTest extends GridCommonAbstractTest { |
| /** Partitions count. */ |
| private static final int PARTS_CNT = 32; |
| |
| /** Set if I/O exception should be thrown on partition file truncation. */ |
| private boolean failFileIo; |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); |
| |
| cfg.setConsistentId(igniteInstanceName); |
| |
| DataStorageConfiguration dsCfg = new DataStorageConfiguration() |
| .setWalMode(WALMode.LOG_ONLY) |
| .setCheckpointFrequency(10 * 60 * 1000) |
| .setDefaultDataRegionConfiguration( |
| new DataRegionConfiguration() |
| .setMaxSize(512 * 1024 * 1024) |
| .setPersistenceEnabled(true) |
| ); |
| |
| if (failFileIo) |
| dsCfg.setFileIOFactory(new FailingFileIOFactory(new RandomAccessFileIOFactory())); |
| |
| cfg.setDataStorageConfiguration(dsCfg); |
| |
| CacheConfiguration ccfg = defaultCacheConfiguration() |
| .setBackups(1) |
| .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) |
| .setAffinity(new RendezvousAffinityFunction(false, PARTS_CNT)); |
| |
| cfg.setCacheConfiguration(ccfg); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| stopAllGrids(); |
| |
| cleanPersistenceDir(); |
| |
| failFileIo = false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| stopAllGrids(); |
| |
| cleanPersistenceDir(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected FailureHandler getFailureHandler(String igniteInstanceName) { |
| return new StopNodeFailureHandler(); |
| } |
| |
| /** |
| * @param ignite Ignite. |
| * @param keysCnt Keys count. |
| */ |
| private void loadData(IgniteEx ignite, int keysCnt, int multiplier) { |
| log.info("Load data: keys=" + keysCnt); |
| |
| try (IgniteDataStreamer streamer = ignite.dataStreamer(DEFAULT_CACHE_NAME)) { |
| streamer.allowOverwrite(true); |
| |
| for (int k = 0; k < keysCnt; k++) |
| streamer.addData(k, k * multiplier); |
| } |
| } |
| |
| /** |
| * @param ignite Ignite. |
| * @param keysCnt Keys count. |
| */ |
| private void checkData(IgniteEx ignite, int keysCnt, int multiplier) { |
| log.info("Check data: " + ignite.name() + ", keys=" + keysCnt); |
| |
| IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); |
| |
| for (int k = 0; k < keysCnt; k++) |
| Assert.assertEquals("node = " + ignite.name() + ", key = " + k, (Integer)(k * multiplier), cache.get(k)); |
| } |
| |
| /** |
| * Test that partition files have been deleted correctly on next checkpoint. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPartitionFileDestroyAfterCheckpoint() throws Exception { |
| IgniteEx crd = (IgniteEx)startGrids(2); |
| |
| crd.cluster().baselineAutoAdjustEnabled(false); |
| crd.cluster().active(true); |
| |
| int keysCnt = 50_000; |
| |
| loadData(crd, keysCnt, 1); |
| |
| startGridsMultiThreaded(2, 2); |
| |
| // Trigger partitions eviction. |
| resetBaselineTopology(); |
| |
| awaitPartitionMapExchange(true, true, null); |
| |
| checkPartitionFiles(crd, true); |
| |
| // This checkpoint should delete partition files. |
| forceCheckpoint(); |
| |
| checkPartitionFiles(crd, false); |
| |
| for (Ignite ignite : G.allGrids()) |
| checkData((IgniteEx)ignite, keysCnt, 1); |
| } |
| |
| /** |
| * Test that partition files are reused correctly. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPartitionFileDestroyAndRecreate() throws Exception { |
| IgniteEx crd = startGrid(0); |
| |
| crd.cluster().baselineAutoAdjustEnabled(false); |
| |
| IgniteEx node = startGrid(1); |
| |
| crd.cluster().active(true); |
| |
| int keysCnt = 50_000; |
| |
| loadData(crd, keysCnt, 1); |
| |
| startGridsMultiThreaded(2, 2); |
| |
| // Trigger partitions eviction. |
| resetBaselineTopology(); |
| |
| awaitPartitionMapExchange(true, true, null); |
| |
| checkPartitionFiles(node, true); |
| |
| // Trigger partitions re-create. |
| stopGrid(2); |
| |
| resetBaselineTopology(); |
| |
| awaitPartitionMapExchange(true, true, null); |
| |
| checkPartitionFiles(node, true); |
| |
| // Rewrite data. |
| loadData(crd, keysCnt, 2); |
| |
| // Force checkpoint on all nodes. |
| forceCheckpoint(); |
| |
| // Check that all unecessary partition files have been deleted. |
| checkPartitionFiles(node, false); |
| |
| for (Ignite ignite : G.allGrids()) |
| checkData((IgniteEx)ignite, keysCnt, 2); |
| } |
| |
| /** |
| * Test that partitions files have been deleted correctly during crash recovery. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPartitionFileDestroyCrashRecovery1() throws Exception { |
| IgniteEx crd = startGrid(0); |
| |
| crd.cluster().baselineAutoAdjustEnabled(false); |
| |
| failFileIo = true; |
| |
| IgniteEx problemNode = startGrid(1); |
| |
| failFileIo = false; |
| |
| crd.cluster().active(true); |
| |
| int keysCnt = 50_000; |
| |
| loadData(crd, keysCnt, 1); |
| |
| startGridsMultiThreaded(2, 2); |
| |
| // Trigger partitions eviction. |
| resetBaselineTopology(); |
| |
| awaitPartitionMapExchange(true, true, null); |
| |
| checkPartitionFiles(problemNode, true); |
| |
| try { |
| forceCheckpoint(problemNode); |
| |
| Assert.assertTrue("Checkpoint must be failed", false); |
| } |
| catch (Exception expected) { |
| expected.printStackTrace(); |
| } |
| |
| // Problem node should be stopped after failed checkpoint. |
| waitForTopology(3); |
| |
| problemNode = startGrid(1); |
| |
| awaitPartitionMapExchange(); |
| |
| // Need to explicitly wait for checkpoint completion or no guarantee what filled on recovery destroyQueue is |
| // actually cleared. |
| forceCheckpoint(problemNode); |
| |
| // After recovery all evicted partition files should be deleted from disk. |
| checkPartitionFiles(problemNode, false); |
| |
| for (Ignite ignite : G.allGrids()) |
| checkData((IgniteEx)ignite, keysCnt, 1); |
| } |
| |
| /** |
| * Test that partitions files are not deleted if they were re-created on next time and no checkpoint has done during |
| * this time. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testPartitionFileDestroyCrashRecovery2() throws Exception { |
| IgniteEx crd = startGrid(0); |
| |
| crd.cluster().baselineAutoAdjustEnabled(false); |
| |
| failFileIo = true; |
| |
| IgniteEx problemNode = startGrid(1); |
| |
| failFileIo = false; |
| |
| crd.cluster().active(true); |
| |
| int keysCnt = 50_000; |
| |
| loadData(crd, keysCnt, 1); |
| |
| // Trigger partitions eviction. |
| startGridsMultiThreaded(2, 2); |
| |
| resetBaselineTopology(); |
| |
| awaitPartitionMapExchange(true, true, null); |
| |
| checkPartitionFiles(problemNode, true); |
| |
| // Trigger partitions re-create. |
| stopGrid(2); |
| |
| resetBaselineTopology(); |
| |
| awaitPartitionMapExchange(true, true, null); |
| |
| checkPartitionFiles(problemNode, true); |
| |
| try { |
| forceCheckpoint(problemNode); |
| |
| Assert.assertTrue("Checkpoint must be failed", false); |
| } |
| catch (Exception expected) { |
| expected.printStackTrace(); |
| } |
| |
| // Problem node should be stopped after failed checkpoint. |
| waitForTopology(2); |
| |
| problemNode = startGrid(1); |
| |
| awaitPartitionMapExchange(); |
| |
| // Need to explicitly wait for checkpoint completion or no guarantee what filled on recovery destroyQueue is |
| // actually cleared. |
| forceCheckpoint(problemNode); |
| |
| // After recovery all evicted partition files should be deleted from disk. |
| checkPartitionFiles(problemNode, false); |
| |
| for (Ignite ignite : G.allGrids()) |
| checkData((IgniteEx)ignite, keysCnt, 1); |
| } |
| |
| /** |
| * Test destroy when partition files are empty and there are no pages for checkpoint. |
| * |
| * @throws Exception If failed. |
| */ |
| @Test |
| public void testDestroyWhenPartitionsAreEmpty() throws Exception { |
| IgniteEx crd = (IgniteEx)startGrids(2); |
| |
| crd.cluster().active(true); |
| |
| forceCheckpoint(); |
| |
| // Evict arbitrary partition. |
| List<GridDhtLocalPartition> parts = crd.cachex(DEFAULT_CACHE_NAME).context().topology().localPartitions(); |
| for (GridDhtLocalPartition part : parts) |
| if (part.state() != GridDhtPartitionState.EVICTED) { |
| part.rent().get(); |
| |
| break; |
| } |
| |
| // This checkpoint has no pages to write, but has one partition file to destroy. |
| forceCheckpoint(crd); |
| |
| checkPartitionFiles(crd, false); |
| } |
| |
| /** |
| * If {@code exists} is {@code true}, checks that all partition files exist if partition has state EVICTED. |
| * |
| * If {@code exists} is {@code false}, checks that all partition files don't exist if partition is absent or has |
| * state EVICTED. |
| * |
| * @param ignite Node. |
| * @param exists If {@code true} method will check that partition file exists, in other case will check that file |
| * doesn't exist. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void checkPartitionFiles(IgniteEx ignite, boolean exists) throws IgniteCheckedException { |
| int evicted = 0; |
| |
| GridDhtPartitionTopology top = ignite.cachex(DEFAULT_CACHE_NAME).context().topology(); |
| |
| for (int p = 0; p < PARTS_CNT; p++) { |
| GridDhtLocalPartition part = top.localPartition(p); |
| |
| File partFile = partitionFile(ignite, DEFAULT_CACHE_NAME, p); |
| |
| if (exists) { |
| if (part != null && part.state() == GridDhtPartitionState.EVICTED) |
| Assert.assertTrue("Partition file has deleted ahead of time: " + partFile, partFile.exists()); |
| |
| evicted++; |
| } |
| else { |
| if (part == null || part.state() == GridDhtPartitionState.EVICTED) |
| Assert.assertTrue("Partition file has not deleted: " + partFile, !partFile.exists()); |
| } |
| } |
| |
| if (exists) |
| Assert.assertTrue("There should be at least 1 eviction", evicted > 0); |
| } |
| |
| /** |
| * @param ignite Ignite. |
| * @param cacheName Cache name. |
| * @param partId Partition id. |
| */ |
| private static File partitionFile(Ignite ignite, String cacheName, int partId) throws IgniteCheckedException { |
| File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false); |
| |
| String nodeName = ignite.name().replaceAll("\\.", "_"); |
| |
| return new File(dbDir, String.format("%s/cache-%s/part-%d.bin", nodeName, cacheName, partId)); |
| } |
| |
| /** |
| * |
| */ |
| static class FailingFileIO extends FileIODecorator { |
| /** |
| * @param delegate File I/O delegate |
| */ |
| public FailingFileIO(FileIO delegate) { |
| super(delegate); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clear() throws IOException { |
| throw new IOException("Test"); |
| } |
| } |
| |
| /** |
| * |
| */ |
| static class FailingFileIOFactory implements FileIOFactory { |
| /** Delegate factory. */ |
| private final FileIOFactory delegateFactory; |
| |
| /** |
| * @param delegateFactory Delegate factory. |
| */ |
| FailingFileIOFactory(FileIOFactory delegateFactory) { |
| this.delegateFactory = delegateFactory; |
| } |
| |
| /** |
| * @param file File. |
| */ |
| private static boolean isPartitionFile(File file) { |
| return file.getName().contains("part") && file.getName().endsWith("bin"); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public FileIO create(File file, OpenOption... modes) throws IOException { |
| FileIO delegate = delegateFactory.create(file, modes); |
| |
| if (isPartitionFile(file)) |
| return new FailingFileIO(delegate); |
| |
| return delegate; |
| } |
| } |
| } |