blob: c75ab4834bf16d166efad92d13613775a8d82064 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.db.wal;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.DiscoverySpiTestListener;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.TrackingPageIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
import org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.PAX;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.lifecycle.LifecycleEventType;
import org.apache.ignite.loadtests.colocation.GridTestLifecycleBean;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.GridTestUtils.SF;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_CHECKPOINT_FREQ;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DATA_FILENAME;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver.genNewStyleSubfolderName;
/**
*
*/
public class IgniteWalRecoveryTest extends GridCommonAbstractTest {
/** */
private static final int PARTS = 32;
/** */
private static final String HAS_CACHE = "HAS_CACHE";
/** */
private static final int LARGE_ARR_SIZE = 1025;
/** */
private static final int LARGE_ENTRY_COUNT = 500;
/** */
private static final int ENTRY_COUNT = 2_000;
/** */
private boolean fork;
/** */
private static final String CACHE_NAME = "partitioned";
/** */
private static final String RENAMED_CACHE_NAME = "partitioned0";
/** */
private static final String CACHE_TO_DESTROY_NAME = "destroyCache";
/** */
private static final String LOC_CACHE_NAME = "local";
/** */
private static final String CACHE_1 = "cache1";
/** */
private static final String CACHE_2 = "cache2";
/** */
private boolean renamed = false;
/** */
private int walSegmentSize = 16 * 1024 * 1024;
/** */
private int walSegments = 10;
/** Log only. */
private boolean logOnly = false;
/** */
private long customFailureDetectionTimeout = -1;
/** */
private long checkpointFrequency = DFLT_CHECKPOINT_FREQ;
/** WAL page snapshots records compression method. */
protected DiskPageCompression walPageCompression;
/** {@inheritDoc} */
@Override protected boolean isMultiJvm() {
return fork;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
CacheConfiguration<Integer, IndexedObject> ccfg = renamed ?
new CacheConfiguration<>(RENAMED_CACHE_NAME) : new CacheConfiguration<>(CACHE_NAME);
ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
ccfg.setAffinity(new RendezvousAffinityFunction(false, PARTS));
ccfg.setNodeFilter(new RemoteNodeFilter());
ccfg.setIndexedTypes(Integer.class, IndexedObject.class);
CacheConfiguration<Integer, IndexedObject> locCcfg = new CacheConfiguration<>(LOC_CACHE_NAME);
locCcfg.setCacheMode(CacheMode.LOCAL);
locCcfg.setIndexedTypes(Integer.class, IndexedObject.class);
CacheConfiguration<Object, Object> cfg1 = new CacheConfiguration<>("cache1")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setAffinity(new RendezvousAffinityFunction(false, 32))
.setCacheMode(CacheMode.PARTITIONED)
.setRebalanceMode(CacheRebalanceMode.SYNC)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setBackups(0);
CacheConfiguration<Object, Object> cfg2 = new CacheConfiguration<>(cfg1).setName("cache2").setRebalanceOrder(10);
cfg.setCacheConfiguration(ccfg, locCcfg, cfg1, cfg2);
DataStorageConfiguration dbCfg = new DataStorageConfiguration();
dbCfg.setPageSize(4 * 1024);
DataRegionConfiguration memPlcCfg = new DataRegionConfiguration();
memPlcCfg.setName("dfltDataRegion");
memPlcCfg.setInitialSize(256L * 1024 * 1024);
memPlcCfg.setMaxSize(256L * 1024 * 1024);
memPlcCfg.setPersistenceEnabled(true);
dbCfg.setDefaultDataRegionConfiguration(memPlcCfg);
dbCfg.setWalRecordIteratorBufferSize(1024 * 1024);
dbCfg.setWalHistorySize(2);
if (logOnly)
dbCfg.setWalMode(WALMode.LOG_ONLY);
if (walSegmentSize != 0)
dbCfg.setWalSegmentSize(walSegmentSize);
dbCfg.setWalSegments(walSegments);
dbCfg.setWalPageCompression(walPageCompression);
dbCfg.setCheckpointFrequency(checkpointFrequency);
cfg.setDataStorageConfiguration(dbCfg);
BinaryConfiguration binCfg = new BinaryConfiguration();
binCfg.setCompactFooter(false);
cfg.setBinaryConfiguration(binCfg);
if (!getTestIgniteInstanceName(0).equals(gridName))
cfg.setUserAttributes(F.asMap(HAS_CACHE, true));
if (customFailureDetectionTimeout > 0)
cfg.setFailureDetectionTimeout(customFailureDetectionTimeout);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
cleanPersistenceDir();
}
/**
* @throws Exception if failed.
*/
@Test
public void testWalBig() throws Exception {
IgniteEx ignite = startGrid(1);
ignite.cluster().active(true);
IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
Random rnd = new Random();
Map<Integer, IndexedObject> map = new HashMap<>();
for (int i = 0; i < 10_000; i++) {
if (i % 1000 == 0)
X.println(" >> " + i);
int k = rnd.nextInt(300_000);
IndexedObject v = new IndexedObject(rnd.nextInt(10_000));
cache.put(k, v);
map.put(k, v);
}
// Check.
for (Integer k : map.keySet())
assertEquals(map.get(k), cache.get(k));
stopGrid(1);
ignite = startGrid(1);
ignite.cluster().active(true);
cache = ignite.cache(CACHE_NAME);
// Check.
for (Integer k : map.keySet())
assertEquals(map.get(k), cache.get(k));
}
/**
* @throws Exception if failed.
*/
@Test
public void testWalBigObjectNodeCancel() throws Exception {
final int MAX_SIZE_POWER = 21;
IgniteEx ignite = startGrid(1);
ignite.cluster().active(true);
IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
for (int i = 0; i < MAX_SIZE_POWER; ++i) {
int size = 1 << i;
cache.put("key_" + i, createTestData(size));
}
stopGrid(1, true);
ignite = startGrid(1);
ignite.cluster().active(true);
cache = ignite.cache(CACHE_NAME);
// Check.
for (int i = 0; i < MAX_SIZE_POWER; ++i) {
int size = 1 << i;
int[] data = createTestData(size);
int[] val = (int[])cache.get("key_" + i);
assertTrue("Invalid data. [key=key_" + i + ']', Arrays.equals(data, val));
}
}
/**
* @throws Exception If fail.
*/
@Test
public void testSwitchClassLoader() throws Exception {
final IgniteEx igniteEx = startGrid(1);
// CustomDiscoveryMessage will trigger service tasks
startGrid(2);
igniteEx.cluster().active(true);
IgniteCache<Integer, EnumVal> cache = igniteEx.cache(CACHE_NAME);
final ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
final ClassLoader newCl = getExternalClassLoader();
Thread.currentThread().setContextClassLoader(newCl);
for (int i = 0; i < 10; i++)
cache.put(i, i % 2 == 0 ? EnumVal.VAL1 : EnumVal.VAL2);
for (int i = 0; i < 10; i++)
assert cache.containsKey(i);
// Invokes ClearTask with new class loader
cache.clear();
Thread.currentThread().setContextClassLoader(oldCl);
for (int i = 0; i < 10; i++)
cache.put(i, i % 2 == 0 ? EnumVal.VAL1 : EnumVal.VAL2);
for (int i = 0; i < 10; i++)
assert cache.containsKey(i);
}
/**
* @throws Exception if failed.
*/
@Test
public void testWalSimple() throws Exception {
IgniteEx ignite = startGrid(1);
ignite.cluster().active(true);
IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
info(" --> step1");
for (int i = 0; i < 10_000; i += 2)
cache.put(i, new IndexedObject(i));
info(" --> step2");
for (int i = 0; i < 10_000; i += 3)
cache.put(i, new IndexedObject(i * 2));
info(" --> step3");
for (int i = 0; i < 10_000; i += 7)
cache.put(i, new IndexedObject(i * 3));
info(" --> check1");
// Check.
for (int i = 0; i < 10_000; i++) {
IndexedObject o;
if (i % 7 == 0)
o = new IndexedObject(i * 3);
else if (i % 3 == 0)
o = new IndexedObject(i * 2);
else if (i % 2 == 0)
o = new IndexedObject(i);
else
o = null;
assertEquals(o, cache.get(i));
}
stopGrid(1);
ignite = startGrid(1);
ignite.cluster().active(true);
cache = ignite.cache(CACHE_NAME);
info(" --> check2");
// Check.
for (int i = 0; i < 10_000; i++) {
IndexedObject o;
if (i % 7 == 0)
o = new IndexedObject(i * 3);
else if (i % 3 == 0)
o = new IndexedObject(i * 2);
else if (i % 2 == 0)
o = new IndexedObject(i);
else
o = null;
assertEquals(o, cache.get(i));
}
info(" --> ok");
}
/**
* @throws Exception If fail.
*/
@Test
public void testWalLargeValue() throws Exception {
IgniteEx ignite = startGrid(1);
ignite.cluster().active(true);
IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
for (int i = 0; i < 10_000; i++) {
final byte[] data = new byte[i];
Arrays.fill(data, (byte)i);
cache.put(i, data);
if (i % 1000 == 0)
X.println(" ---> put: " + i);
}
stopGrid(1);
ignite = startGrid(1);
ignite.cluster().active(true);
cache = ignite.cache(CACHE_NAME);
info(" --> check2");
for (int i = 0; i < 10_000; i++) {
final byte[] data = new byte[i];
Arrays.fill(data, (byte)i);
final byte[] loaded = (byte[])cache.get(i);
Assert.assertArrayEquals(data, loaded);
if (i % 1000 == 0)
X.println(" ---> get: " + i);
}
}
/**
* Check binary recover completes successfully when node stopped at the middle of checkpoint. Destroy cache_data.bin
* file for particular cache to emulate missing {@link DynamicCacheDescriptor} file (binary recovery should complete
* successfully in this case).
*
* @throws Exception if failed.
*/
@Test
public void testBinaryRecoverBeforePMEWhenMiddleCheckpoint() throws Exception {
startGrids(3);
IgniteEx ig2 = grid(2);
ig2.cluster().active(true);
IgniteCache<Object, Object> cache = ig2.cache(CACHE_NAME);
for (int i = 1; i <= 4_000; i++)
cache.put(i, new BigObject(i));
BigObject objToCheck;
ig2.getOrCreateCache(CACHE_TO_DESTROY_NAME).put(1, objToCheck = new BigObject(1));
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)ig2
.context().cache().context().database();
IgniteInternalFuture<?> cpFinishFut = dbMgr.forceCheckpoint("force checkpoint").futureFor(FINISHED);
// Delete checkpoint END file to emulate node stopped at the middle of checkpoint.
cpFinishFut.listen(new IgniteInClosureX<IgniteInternalFuture>() {
@Override public void applyx(IgniteInternalFuture fut0) throws IgniteCheckedException {
try {
CheckpointEntry cpEntry = dbMgr.checkpointHistory().lastCheckpoint();
String cpEndFileName = CheckpointMarkersStorage.checkpointFileName(cpEntry, CheckpointEntryType.END);
Files.delete(Paths.get(dbMgr.checkpointDirectory().getAbsolutePath(), cpEndFileName));
log.info("Checkpoint marker removed [cpEndFileName=" + cpEndFileName + ']');
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
}
});
// Resolve cache directory. Emulating cache destroy in the middle of checkpoint.
IgniteInternalCache<Object, Object> destoryCache = ig2.cachex(CACHE_TO_DESTROY_NAME);
FilePageStoreManager pageStoreMgr = (FilePageStoreManager)destoryCache.context().shared().pageStore();
File destroyCacheWorkDir = pageStoreMgr.cacheWorkDir(destoryCache.configuration());
// Stop the whole cluster
stopAllGrids();
// Delete cache_data.bin file for this cache. Binary recovery should complete successfully after it.
final File[] files = destroyCacheWorkDir.listFiles(new FilenameFilter() {
@Override public boolean accept(final File dir, final String name) {
return name.endsWith(CACHE_DATA_FILENAME);
}
});
assertTrue(files.length > 0);
for (final File file : files)
assertTrue("Can't remove " + file.getAbsolutePath(), file.delete());
startGrids(2);
// Preprare Ignite instance configuration with additional Discovery checks.
final String ig2Name = getTestIgniteInstanceName(2);
final IgniteConfiguration onJoinCfg = optimize(getConfiguration(ig2Name));
// Check restore beeing called before PME and joining node to cluster.
((IgniteDiscoverySpi)onJoinCfg.getDiscoverySpi())
.setInternalListener(new DiscoverySpiTestListener() {
@Override public void beforeJoin(ClusterNode locNode, IgniteLogger log) {
String nodeName = locNode.attribute(ATTR_IGNITE_INSTANCE_NAME);
GridCacheSharedContext sharedCtx = ((IgniteEx)ignite(getTestIgniteInstanceIndex(nodeName)))
.context()
.cache()
.context();
if (nodeName.equals(ig2Name)) {
// Checkpoint history initialized on node start.
assertFalse(((GridCacheDatabaseSharedManager)sharedCtx.database())
.checkpointHistory().checkpoints().isEmpty());
}
super.beforeJoin(locNode, log);
}
});
Ignite restoredIg2 = startGrid(ig2Name, onJoinCfg);
awaitPartitionMapExchange();
assertEquals(restoredIg2.cache(CACHE_TO_DESTROY_NAME).get(1), objToCheck);
}
/**
* @throws Exception if failed.
*/
@Test
public void testWalRolloverMultithreadedDefault() throws Exception {
logOnly = false;
checkWalRolloverMultithreaded();
}
/**
* @throws Exception if failed.
*/
@Test
public void testWalRolloverMultithreadedLogOnly() throws Exception {
logOnly = true;
checkWalRolloverMultithreaded();
}
/**
* @throws Exception if failed.
*/
@Test
public void testHugeCheckpointRecord() throws Exception {
long prevFDTimeout = customFailureDetectionTimeout;
try {
customFailureDetectionTimeout = 40_000;
final IgniteEx ignite = startGrid(1);
ignite.cluster().active(true);
for (int i = 0; i < 50; i++) {
CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>("cache-" + i);
// We can get 'too many open files' with default number of partitions.
ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
IgniteCache<Object, Object> cache = ignite.getOrCreateCache(ccfg);
cache.put(i, i);
}
final long endTime = System.currentTimeMillis() + SF.applyLB(30_000, 5_000);
IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() {
Random rnd = ThreadLocalRandom.current();
while (U.currentTimeMillis() < endTime) {
IgniteCache<Object, Object> cache = ignite.cache("cache-" + rnd.nextInt(50));
cache.put(rnd.nextInt(50_000), rnd.nextInt());
}
return null;
}
}, 16, "put-thread");
while (System.currentTimeMillis() < endTime) {
ignite.context().cache().context().database().wakeupForCheckpoint("test").get();
U.sleep(500);
}
fut.get();
ignite.context().cache().context().database().wakeupForCheckpoint("final-test-checkpoint").get();
}
finally {
customFailureDetectionTimeout = prevFDTimeout;
}
}
/**
* @throws Exception if failed.
*/
private void checkWalRolloverMultithreaded() throws Exception {
walSegmentSize = 2 * 1024 * 1024;
final long endTime = System.currentTimeMillis() + SF.apply(50 * 1000);
IgniteEx ignite = startGrid(1);
ignite.cluster().active(true);
final IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() {
Random rnd = ThreadLocalRandom.current();
while (System.currentTimeMillis() < endTime)
cache.put(rnd.nextInt(50_000), rnd.nextInt());
return null;
}
}, 16, "put-thread");
}
/**
* @throws Exception If fail.
*/
@Test
public void testWalRenameDirSimple() throws Exception {
IgniteEx ignite = startGrid(1);
ignite.cluster().active(true);
IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
for (int i = 0; i < 100; i++)
cache.put(i, new IndexedObject(i));
final Object consistentId = ignite.cluster().localNode().consistentId();
stopGrid(1);
final File cacheDir = cacheDir(CACHE_NAME, consistentId.toString());
renamed = cacheDir.renameTo(new File(cacheDir.getParent(), "cache-" + RENAMED_CACHE_NAME));
assert renamed;
ignite = startGrid(1);
ignite.cluster().active(true);
cache = ignite.cache(RENAMED_CACHE_NAME);
for (int i = 0; i < 100; i++)
assertEquals(new IndexedObject(i), cache.get(i));
}
/**
* @param cacheName Cache name.
* @param consId Consistent ID.
* @return Cache dir.
* @throws IgniteCheckedException If fail.
*/
private File cacheDir(final String cacheName, final String consId) throws IgniteCheckedException {
final String subfolderName = genNewStyleSubfolderName(0, UUID.fromString(consId));
final File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false);
assert dbDir.exists();
final File consIdDir = new File(dbDir.getAbsolutePath(), subfolderName);
assert consIdDir.exists();
final File cacheDir = new File(consIdDir.getAbsolutePath(), "cache-" + cacheName);
assert cacheDir.exists();
return cacheDir;
}
/**
* @throws Exception if failed.
*/
@Test
public void testRecoveryNoCheckpoint() throws Exception {
IgniteEx ctrlGrid = startGrid(0);
fork = true;
IgniteEx cacheGrid = startGrid(1);
ctrlGrid.cluster().active(true);
ctrlGrid.compute(ctrlGrid.cluster().forRemotes()).run(new LoadRunnable(false));
info("Killing remote process...");
((IgniteProcessProxy)cacheGrid).kill();
final IgniteEx g0 = ctrlGrid;
GridTestUtils.waitForCondition(new PA() {
/** {@inheritDoc} */
@Override public boolean apply() {
return g0.cluster().nodes().size() == 1;
}
}, getTestTimeout());
fork = false;
// Now start the grid and verify that updates were restored from WAL.
cacheGrid = startGrid(1);
IgniteCache<Object, Object> cache = cacheGrid.cache(CACHE_NAME);
// Expecting lost partitions.
assertFalse(cache.lostPartitions().isEmpty());
cacheGrid.resetLostPartitions(Collections.singleton(CACHE_NAME));
for (int i = 0; i < ENTRY_COUNT; i++)
assertEquals(new IndexedObject(i), cache.get(i));
List<List<?>> res = cache.query(new SqlFieldsQuery("select count(iVal) from IndexedObject")).getAll();
assertEquals(1, res.size());
assertEquals((long)ENTRY_COUNT, res.get(0).get(0));
IgniteCache<Object, Object> locCache = cacheGrid.cache(LOC_CACHE_NAME);
for (int i = 0; i < ENTRY_COUNT; i++)
assertEquals(new IndexedObject(i), locCache.get(i));
}
/**
* @throws Exception if failed.
*/
@Test
public void testRecoveryLargeNoCheckpoint() throws Exception {
IgniteEx ctrlGrid = startGrid(0);
fork = true;
IgniteEx cacheGrid = startGrid(1);
ctrlGrid.cluster().active(true);
ctrlGrid.compute(ctrlGrid.cluster().forRemotes()).run(new LargeLoadRunnable(false));
info("Killing remote process...");
((IgniteProcessProxy)cacheGrid).kill();
final IgniteEx g0 = ctrlGrid;
GridTestUtils.waitForCondition(new PA() {
/** {@inheritDoc} */
@Override public boolean apply() {
return g0.cluster().nodes().size() == 1;
}
}, getTestTimeout());
fork = false;
// Now start the grid and verify that updates were restored from WAL.
cacheGrid = startGrid(1);
IgniteCache<Object, Object> cache = cacheGrid.cache(CACHE_NAME);
// Expecting lost partitions.
assertFalse(cache.lostPartitions().isEmpty());
cacheGrid.resetLostPartitions(Collections.singleton(CACHE_NAME));
IgniteCache<Object, Object> locCache = cacheGrid.cache(LOC_CACHE_NAME);
for (int i = 0; i < LARGE_ENTRY_COUNT; i++) {
final long[] data = new long[LARGE_ARR_SIZE];
Arrays.fill(data, i);
Assert.assertArrayEquals(data, (long[])cache.get(i));
Assert.assertArrayEquals(data, (long[])locCache.get(i));
}
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return TimeUnit.MINUTES.toMillis(20);
}
/**
* @throws Exception if failed.
*/
@Test
public void testRandomCrash() throws Exception {
checkpointFrequency = 2_000 + new Random().nextInt(4_000);
IgniteEx ctrlGrid = startGrid(0);
fork = true;
IgniteEx cacheGrid = startGrid(1);
ctrlGrid.cluster().active(true);
IgniteCompute rmt = ctrlGrid.compute(ctrlGrid.cluster().forRemotes());
rmt.run(new LoadRunnable(false));
info(">>> Finished cache population.");
rmt.run(new AsyncLoadRunnable());
Thread.sleep(5_000);
info(">>> Killing remote process...");
((IgniteProcessProxy)cacheGrid).kill();
startGrid(1);
final GridDhtPartitionTopology top = ctrlGrid.cachex(CACHE_NAME).context().topology();
waitForReadyTopology(top, new AffinityTopologyVersion(3, 0));
assertFalse(top.lostPartitions().isEmpty());
int res = rmt.call(new VerifyCallable());
assertEquals(0, res);
}
/**
* @throws Exception if failed.
*/
@Test
public void testLargeRandomCrash() throws Exception {
checkpointFrequency = 2_000 + new Random().nextInt(4_000);
IgniteEx ctrlGrid = startGrid(0);
fork = true;
IgniteEx cacheGrid = startGrid(1);
ctrlGrid.cluster().active(true);
IgniteCompute rmt = ctrlGrid.compute(ctrlGrid.cluster().forRemotes());
rmt.run(new LargeLoadRunnable(false));
info(">>> Finished cache population.");
rmt.run(new AsyncLargeLoadRunnable());
Thread.sleep(5_000);
info(">>> Killing remote process...");
((IgniteProcessProxy)cacheGrid).kill();
startGrid(1);
final GridDhtPartitionTopology top = ctrlGrid.cachex(CACHE_NAME).context().topology();
waitForReadyTopology(top, new AffinityTopologyVersion(3, 0));
assertFalse(top.lostPartitions().isEmpty());
int res = rmt.call(new VerifyLargeCallable());
assertEquals(0, res);
}
/**
*
*/
private static class RemoteNodeFilter implements IgnitePredicate<ClusterNode> {
/** {@inheritDoc} */
@Override public boolean apply(ClusterNode clusterNode) {
return clusterNode.attribute(HAS_CACHE) != null;
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testDestroyCache() throws Exception {
IgniteEx ignite = startGrid(1);
ignite.cluster().active(true);
IgniteCache<Object, Object> cache = ignite.getOrCreateCache("test");
cache.put(1, new IndexedObject(1));
ignite.destroyCache("test");
cache = ignite.getOrCreateCache("test");
// No entry available after cache destroy.
assertNull(cache.get(1));
}
/**
* @throws Exception If fail.
*/
@Test
public void testEvictPartition() throws Exception {
Ignite ignite1 = startGrid("node1");
ignite1.cluster().active(true);
IgniteCache<Object, Object> cache1 = ignite1.cache(CACHE_NAME);
for (int i = 0; i < 100; i++)
cache1.put(i, new IndexedObject(i));
Ignite ignite2 = startGrid("node2");
IgniteCache<Object, Object> cache2 = ignite2.cache(CACHE_NAME);
for (int i = 0; i < 100; i++) {
assertEquals(new IndexedObject(i), cache1.get(i));
assertEquals(new IndexedObject(i), cache2.get(i));
}
ignite1.close();
ignite2.close();
ignite1 = startGrid("node1");
ignite2 = startGrid("node2");
ignite1.cluster().active(true);
cache1 = ignite1.cache(CACHE_NAME);
cache2 = ignite2.cache(CACHE_NAME);
for (int i = 0; i < 100; i++) {
assertEquals(new IndexedObject(i), cache1.get(i));
assertEquals(new IndexedObject(i), cache2.get(i));
}
}
/**
* @throws Exception If fail.
*/
@Test
public void testMetastorage() throws Exception {
int cnt = 5000;
IgniteEx ignite0 = (IgniteEx)startGrid("node1");
IgniteEx ignite1 = (IgniteEx)startGrid("node2");
ignite1.cluster().active(true);
GridCacheSharedContext<Object, Object> sharedCtx0 = ignite0.context().cache().context();
GridCacheSharedContext<Object, Object> sharedCtx1 = ignite1.context().cache().context();
MetaStorage storage0 = sharedCtx0.database().metaStorage();
MetaStorage storage1 = sharedCtx1.database().metaStorage();
assert storage0 != null;
for (int i = 0; i < cnt; i++) {
sharedCtx0.database().checkpointReadLock();
try {
storage0.writeRaw(String.valueOf(i), new byte[] {(byte)(i % 256), 2, 3});
}
finally {
sharedCtx0.database().checkpointReadUnlock();
}
byte[] b1 = new byte[i + 3];
b1[0] = 1;
b1[1] = 2;
b1[2] = 3;
sharedCtx1.database().checkpointReadLock();
try {
storage1.writeRaw(String.valueOf(i), b1);
}
finally {
sharedCtx1.database().checkpointReadUnlock();
}
}
for (int i = 0; i < cnt; i++) {
byte[] d1 = storage0.readRaw(String.valueOf(i));
assertEquals(3, d1.length);
assertEquals((byte)(i % 256), d1[0]);
assertEquals(2, d1[1]);
assertEquals(3, d1[2]);
byte[] d2 = storage1.readRaw(String.valueOf(i));
assertEquals(i + 3, d2.length);
assertEquals(1, d2[0]);
assertEquals(2, d2[1]);
assertEquals(3, d2[2]);
}
}
/**
* @throws Exception If fail.
*/
@Test
public void testMetastorageLargeArray() throws Exception {
int cnt = 5000;
int arraySize = 32_768;
IgniteEx ignite = (IgniteEx)startGrid("node1");
ignite.cluster().active(true);
GridCacheSharedContext<Object, Object> sharedCtx = ignite.context().cache().context();
MetaStorage storage = sharedCtx.database().metaStorage();
for (int i = 0; i < cnt; i++) {
byte[] b1 = new byte[arraySize];
for (int k = 0; k < arraySize; k++) {
b1[k] = (byte)(k % 100);
}
sharedCtx.database().checkpointReadLock();
try {
storage.writeRaw(String.valueOf(i), b1);
}
finally {
sharedCtx.database().checkpointReadUnlock();
}
}
for (int i = 0; i < cnt; i++) {
byte[] d2 = storage.readRaw(String.valueOf(i));
assertEquals(arraySize, d2.length);
for (int k = 0; k < arraySize; k++) {
assertEquals((byte)(k % 100), d2[k]);
}
}
}
/**
* @throws Exception If fail.
*/
@Test
public void testMetastorageRemove() throws Exception {
int cnt = 400;
IgniteEx ignite0 = (IgniteEx)startGrid("node1");
ignite0.cluster().active(true);
GridCacheSharedContext<Object, Object> sharedCtx0 = ignite0.context().cache().context();
MetaStorage storage = sharedCtx0.database().metaStorage();
assert storage != null;
for (int i = 0; i < cnt; i++) {
sharedCtx0.database().checkpointReadLock();
try {
storage.writeRaw(String.valueOf(i), new byte[] {1, 2, 3});
}
finally {
sharedCtx0.database().checkpointReadUnlock();
}
}
for (int i = 0; i < 10; i++) {
sharedCtx0.database().checkpointReadLock();
try {
storage.removeData(String.valueOf(i));
}
finally {
sharedCtx0.database().checkpointReadUnlock();
}
}
for (int i = 10; i < cnt; i++) {
byte[] d1 = storage.readRaw(String.valueOf(i));
assertEquals(3, d1.length);
assertEquals(1, d1[0]);
assertEquals(2, d1[1]);
assertEquals(3, d1[2]);
}
}
/**
* @throws Exception If fail.
*/
@Test
public void testMetastorageUpdate() throws Exception {
try {
int cnt = 2000;
IgniteEx ignite0 = (IgniteEx)startGrid("node1");
ignite0.cluster().active(true);
GridCacheSharedContext<Object, Object> sharedCtx0 = ignite0.context().cache().context();
MetaStorage storage = sharedCtx0.database().metaStorage();
assert storage != null;
for (int i = 0; i < cnt; i++) {
sharedCtx0.database().checkpointReadLock();
try {
storage.writeRaw(String.valueOf(i), new byte[] {1, 2, 3});
}
finally {
sharedCtx0.database().checkpointReadUnlock();
}
}
for (int i = 0; i < cnt; i++) {
sharedCtx0.database().checkpointReadLock();
try {
storage.writeRaw(String.valueOf(i), new byte[] {2, 2, 3, 4});
}
finally {
sharedCtx0.database().checkpointReadUnlock();
}
}
for (int i = 0; i < cnt; i++) {
byte[] d1 = storage.readRaw(String.valueOf(i));
assertEquals(4, d1.length);
assertEquals(2, d1[0]);
assertEquals(2, d1[1]);
assertEquals(3, d1[2]);
}
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If fail.
*/
@Test
public void testMetastorageWalRestore() throws Exception {
int cnt = 2000;
IgniteEx ignite0 = startGrid(0);
ignite0.cluster().active(true);
GridCacheSharedContext<Object, Object> sharedCtx0 = ignite0.context().cache().context();
MetaStorage storage = sharedCtx0.database().metaStorage();
assert storage != null;
for (int i = 0; i < cnt; i++) {
sharedCtx0.database().checkpointReadLock();
try {
storage.writeRaw(String.valueOf(i), new byte[] {1, 2, 3});
}
finally {
sharedCtx0.database().checkpointReadUnlock();
}
}
for (int i = 0; i < cnt; i++) {
byte[] value = storage.readRaw(String.valueOf(i));
assert value != null;
assert value.length == 3;
}
stopGrid(0);
ignite0 = startGrid(0);
ignite0.cluster().active(true);
sharedCtx0 = ignite0.context().cache().context();
storage = sharedCtx0.database().metaStorage();
assert storage != null;
for (int i = 0; i < cnt; i++) {
byte[] value = storage.readRaw(String.valueOf(i));
assert value != null;
}
}
/**
* @throws Exception if failed.
*/
@Test
public void testAbsentDeadlock_Iterator_RollOver_Archivation() throws Exception {
walSegments = 2;
walSegmentSize = 512 * 1024;
IgniteEx ignite0 = (IgniteEx)startGrid("node0");
ignite0.active(true);
IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME);
for (int i = 0; i < 100; i++)
cache0.put(i, new IndexedObject(i));
GridCacheSharedContext<Object, Object> sharedCtx = ignite0.context().cache().context();
GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database();
db.waitForCheckpoint("test");
db.enableCheckpoints(false).get();
// Log something to know where to start.
WALPointer ptr = sharedCtx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis()));
info("Replay marker: " + ptr);
for (int i = 100; i < 200; i++)
cache0.put(i, new IndexedObject(i));
CountDownLatch insertFinished = new CountDownLatch(1);
GridTestUtils.runAsync(
() -> {
try (WALIterator it = sharedCtx.wal().replay(ptr)) {
if (it.hasNext()) {
it.next();
insertFinished.await();
}
}
return null;
}
);
IgniteInternalFuture<Object> future = GridTestUtils.runAsync(
() -> {
for (int i = 0; i < 10000; i++)
cache0.put(i, new IndexedObject(i));
return null;
}
);
future.get();
insertFinished.countDown();
ignite0.close();
}
/**
* @throws Exception if failed.
*/
@Test
public void testApplyDeltaRecords() throws Exception {
IgniteEx ignite0 = (IgniteEx)startGrid("node0");
ignite0.cluster().active(true);
IgniteCache<Object, Object> cache0 = ignite0.cache(CACHE_NAME);
for (int i = 0; i < 1000; i++)
cache0.put(i, new IndexedObject(i));
GridCacheSharedContext<Object, Object> sharedCtx = ignite0.context().cache().context();
GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database();
db.waitForCheckpoint("test");
db.enableCheckpoints(false).get();
// Log something to know where to start.
WALPointer ptr = sharedCtx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis()));
info("Replay marker: " + ptr);
for (int i = 1000; i < 5000; i++)
cache0.put(i, new IndexedObject(i));
info("Done puts...");
for (int i = 2_000; i < 3_000; i++)
cache0.remove(i);
info("Done removes...");
for (int i = 5000; i < 6000; i++)
cache0.put(i, new IndexedObject(i));
info("Done puts...");
Map<FullPageId, byte[]> rolledPages = new HashMap<>();
int pageSize = sharedCtx.database().pageSize();
ByteBuffer buf = ByteBuffer.allocateDirect(pageSize);
buf.order(ByteOrder.nativeOrder());
// Now check that deltas can be correctly applied.
try (WALIterator it = sharedCtx.wal().replay(ptr)) {
while (it.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
WALRecord rec = tup.get2();
if (rec instanceof PageSnapshot) {
PageSnapshot page = (PageSnapshot)rec;
CacheGroupContext gctx = sharedCtx.cache().cacheGroup(page.groupId());
int realPageSize = gctx == null ? pageSize
: gctx.dataRegion().pageMemory().realPageSize(page.groupId());
byte[] pageData = page.pageData();
if (pageData.length < realPageSize) {
buf.clear();
buf.put(pageData);
buf.flip();
sharedCtx.kernalContext().compress().decompressPage(buf, realPageSize);
pageData = new byte[realPageSize];
buf.position(0);
buf.get(pageData);
}
rolledPages.put(page.fullPageId(), pageData);
}
else if (rec instanceof PageDeltaRecord) {
PageDeltaRecord delta = (PageDeltaRecord)rec;
FullPageId fullId = new FullPageId(delta.pageId(), delta.groupId());
byte[] pageData = rolledPages.get(fullId);
if (pageData == null) {
pageData = new byte[pageSize];
rolledPages.put(fullId, pageData);
}
assertNotNull("Missing page snapshot [page=" + fullId + ", delta=" + delta + ']', pageData);
buf.clear();
buf.put(pageData);
buf.flip();
delta.applyDelta(sharedCtx.database().dataRegion(null).pageMemory(),
GridUnsafe.bufferAddress(buf));
buf.get(pageData);
}
}
}
info("Done apply...");
PageMemoryEx pageMem = (PageMemoryEx)db.dataRegion(null).pageMemory();
ByteBuffer bufWal = ByteBuffer.allocateDirect(pageSize);
for (Map.Entry<FullPageId, byte[]> entry : rolledPages.entrySet()) {
FullPageId fullId = entry.getKey();
ignite0.context().cache().context().database().checkpointReadLock();
try {
long page = pageMem.acquirePage(
fullId.groupId(), fullId.pageId(), IoStatisticsHolderNoOp.INSTANCE, true);
try {
long bufPtr = pageMem.writeLock(fullId.groupId(), fullId.pageId(), page, true);
try {
byte[] data = entry.getValue();
if (fullId.pageId() == TrackingPageIO.VERSIONS.latest().trackingPageFor(fullId.pageId(), db.pageSize()))
continue; // Skip tracking pages.
// Compaction/restoring page can left some trash in unused space, so we need to compare
// compacted pages in case of compaction is used.
if (walPageCompression != null && PageIO.getPageIO(bufPtr) instanceof CompactablePageIO) {
CompactablePageIO pageIO = PageIO.getPageIO(bufPtr);
buf.clear();
bufWal.clear();
int realPageSize = data.length;
pageIO.compactPage(GridUnsafe.wrapPointer(bufPtr, realPageSize), buf, realPageSize);
pageIO.compactPage(ByteBuffer.wrap(data), bufWal, realPageSize);
bufPtr = GridUnsafe.bufferAddress(buf);
data = new byte[bufWal.limit()];
bufWal.rewind();
bufWal.get(data);
}
for (int i = 0; i < data.length; i++)
assertEquals("page=" + fullId + ", pos=" + i, PageUtils.getByte(bufPtr, i), data[i]);
}
finally {
pageMem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, false, true);
}
}
finally {
pageMem.releasePage(fullId.groupId(), fullId.pageId(), page);
}
}
finally {
ignite0.context().cache().context().database().checkpointReadUnlock();
}
}
ignite0.close();
}
/**
* Test recovery from WAL on 3 nodes in case of transactional cache.
*
* @throws Exception If fail.
*/
@Test
public void testRecoveryOnTransactionalAndPartitionedCache() throws Exception {
IgniteEx ignite = startGrids(3);
ignite.cluster().state(ClusterState.ACTIVE);
final String cacheName = "transactional";
CacheConfiguration<Object, Object> cacheConfiguration = new CacheConfiguration<>(cacheName)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setAffinity(new RendezvousAffinityFunction(false, 32))
.setCacheMode(CacheMode.PARTITIONED)
.setRebalanceMode(CacheRebalanceMode.SYNC)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setBackups(2);
ignite.createCache(cacheConfiguration);
IgniteCache<Object, Object> cache = ignite.cache(cacheName);
Map<Object, Object> map = new HashMap<>();
final int transactions = 100;
final int operationsPerTransaction = 40;
Random random = new Random();
for (int t = 1; t <= transactions; t++) {
Transaction tx = ignite.transactions().txStart(
TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
Map<Object, Object> changesInTransaction = new HashMap<>();
for (int op = 0; op < operationsPerTransaction; op++) {
int key = random.nextInt(1000) + 1;
Object value = random.nextBoolean() ? randomString(random) + key : new BigObject(key);
changesInTransaction.put(key, value);
cache.put(key, value);
}
if (random.nextBoolean()) {
tx.commit();
map.putAll(changesInTransaction);
}
else {
tx.rollback();
}
if (t % 50 == 0)
log.info("Finished transaction " + t);
}
stopAllGrids();
ignite = startGrids(3);
ignite.cluster().state(ClusterState.ACTIVE);
cache = ignite.cache(cacheName);
for (Object key : map.keySet()) {
Object expectedValue = map.get(key);
Object actualValue = cache.get(key);
Assert.assertEquals("Unexpected value for key " + key, expectedValue, actualValue);
}
}
/**
* Test that all DataRecord WAL records are within transaction boundaries - PREPARED and COMMITTED markers.
*
* @throws Exception If any fail.
*/
@Test
public void testTxRecordsConsistency() throws Exception {
IgniteEx ignite = startGrids(3);
ignite.cluster().state(ClusterState.ACTIVE);
final String cacheName = "transactional";
CacheConfiguration<Object, Object> cacheConfiguration = new CacheConfiguration<>(cacheName)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setAffinity(new RendezvousAffinityFunction(false, 32))
.setCacheMode(CacheMode.PARTITIONED)
.setRebalanceMode(CacheRebalanceMode.SYNC)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
.setBackups(0);
ignite.createCache(cacheConfiguration);
IgniteCache<Object, Object> cache = ignite.cache(cacheName);
GridCacheSharedContext<Object, Object> sharedCtx = ignite.context().cache().context();
GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database();
db.waitForCheckpoint("test");
db.enableCheckpoints(false).get();
// Log something to know where to start.
WALPointer startPtr = sharedCtx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis()));
final int transactions = 100;
final int operationsPerTransaction = 40;
Random random = new Random();
for (int t = 1; t <= transactions; t++) {
Transaction tx = ignite.transactions().txStart(
TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
for (int op = 0; op < operationsPerTransaction; op++) {
int key = random.nextInt(1000) + 1;
Object value = random.nextBoolean() ? randomString(random) + key : new BigObject(key);
cache.put(key, value);
}
if (random.nextBoolean())
tx.commit();
else
tx.rollback();
if (t % 50 == 0)
log.info("Finished transaction " + t);
}
Set<GridCacheVersion> activeTransactions = new HashSet<>();
// Check that all DataRecords are within PREPARED and COMMITTED tx records.
try (WALIterator it = sharedCtx.wal().replay(startPtr)) {
while (it.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
WALRecord rec = tup.get2();
if (rec instanceof TxRecord) {
TxRecord txRecord = (TxRecord)rec;
GridCacheVersion txId = txRecord.nearXidVersion();
switch (txRecord.state()) {
case PREPARED:
assert !activeTransactions.contains(txId) : "Transaction is already present " + txRecord;
activeTransactions.add(txId);
break;
case COMMITTED:
assert activeTransactions.contains(txId) : "No PREPARE marker for transaction " + txRecord;
activeTransactions.remove(txId);
break;
case ROLLED_BACK:
activeTransactions.remove(txId);
break;
default:
throw new IllegalStateException("Unknown Tx state of record " + txRecord);
}
}
else if (rec instanceof DataRecord) {
DataRecord dataRecord = (DataRecord)rec;
for (DataEntry entry : dataRecord.writeEntries()) {
GridCacheVersion txId = entry.nearXidVersion();
assert activeTransactions.contains(txId) : "No transaction for entry " + entry;
}
}
}
}
}
/**
* Tests a scenario when a coordinator has failed after recovery during node join.
*/
@Test
@WithSystemProperty(key = "IGNITE_DISABLE_WAL_DURING_REBALANCING", value = "false")
public void testRecoveryAfterRestart_Join() throws Exception {
IgniteEx crd = startGrid(1);
crd.cluster().active(true);
for (int i = 0; i < PARTS; i++) {
crd.cache(CACHE_1).put(i, i);
crd.cache(CACHE_2).put(i, i);
}
TestRecordingCommunicationSpi.spi(crd).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode clusterNode, Message msg) {
if (msg instanceof GridDhtPartitionSupplyMessage) {
GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage) msg;
return msg0.groupId() == CU.cacheId(CACHE_2);
}
return false;
}
});
IgniteEx g2 = startGrid(2);
resetBaselineTopology();
TestRecordingCommunicationSpi.spi(crd).waitForBlocked();
forceCheckpoint(g2);
g2.close();
TestRecordingCommunicationSpi.spi(crd).stopBlock();
waitForTopology(1);
CountDownLatch l1 = new CountDownLatch(1);
CountDownLatch l2 = new CountDownLatch(1);
GridTestUtils.runAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
startGrid(getPMEBlockingConfiguration(2, l1, l2));
return null;
}
});
assertTrue(U.await(l1, 10, TimeUnit.SECONDS));
stopGrid(getTestIgniteInstanceName(1), true, false);
l2.countDown();
awaitPartitionMapExchange();
}
/**
* Tests a scenario when a coordinator has failed after recovery during activation.
*/
@Test
@WithSystemProperty(key = IGNITE_DISABLE_WAL_DURING_REBALANCING, value = "false")
public void testRecoveryAfterRestart_Activate() throws Exception {
IgniteEx crd = startGrid(1);
crd.cluster().active(true);
for (int i = 0; i < PARTS; i++) {
crd.cache(CACHE_1).put(i, i);
crd.cache(CACHE_2).put(i, i);
}
TestRecordingCommunicationSpi.spi(crd).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
@Override public boolean apply(ClusterNode clusterNode, Message msg) {
if (msg instanceof GridDhtPartitionSupplyMessage) {
GridDhtPartitionSupplyMessage msg0 = (GridDhtPartitionSupplyMessage) msg;
return msg0.groupId() == CU.cacheId(CACHE_2);
}
return false;
}
});
IgniteEx g2 = startGrid(2);
resetBaselineTopology();
TestRecordingCommunicationSpi.spi(crd).waitForBlocked();
forceCheckpoint(g2);
stopAllGrids();
waitForTopology(0);
// Restart and activate.
CountDownLatch l1 = new CountDownLatch(1);
CountDownLatch l2 = new CountDownLatch(1);
crd = startGrid(1);
startGrid(getPMEBlockingConfiguration(2, l1, l2));
GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
grid(1).cluster().active(true);
}
});
assertTrue(U.await(l1, 10, TimeUnit.SECONDS));
stopGrid(getTestIgniteInstanceName(1), true, false);
l2.countDown();
awaitPartitionMapExchange();
}
/**
* Generate random lowercase string for test purposes.
*/
private String randomString(Random random) {
int len = random.nextInt(50) + 1;
StringBuilder sb = new StringBuilder();
for (int i = 0; i < len; i++)
sb.append(random.nextInt(26) + 'a');
return sb.toString();
}
/**
* Returns node start configuration with ability to sync on PME onInitBeforeTopologyLock stage.
*
* @param idx Node index.
* @param l1 Blocked event latch.
* @param l2 Released event latch.
*
* @return A configuration.
*/
private IgniteConfiguration getPMEBlockingConfiguration(
int idx,
CountDownLatch l1,
CountDownLatch l2
) throws Exception {
return getConfiguration(getTestIgniteInstanceName(idx)).setLifecycleBeans(new GridTestLifecycleBean() {
@Override public void onLifecycleEvent(LifecycleEventType type) {
if (type == LifecycleEventType.BEFORE_NODE_START) {
g.context().internalSubscriptionProcessor().registerDistributedMetastorageListener(
new DistributedMetastorageLifecycleListener() {
@Override public void onReadyForRead(ReadableDistributedMetaStorage metastorage) {
g.context().cache().context().exchange().registerExchangeAwareComponent(
new PartitionsExchangeAware() {
@Override public void onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
l1.countDown();
try {
assertTrue(U.await(l2, 10, TimeUnit.SECONDS));
} catch (IgniteInterruptedCheckedException e) {
fail(X.getFullStackTrace(e));
}
}
});
}
});
}
}
});
}
/**
* BigObject for test purposes that don't fit in page size.
*/
private static class BigObject {
/** */
private final int index;
/** */
private final byte[] payload = new byte[4096];
/** */
BigObject(int index) {
this.index = index;
// Create pseudo-random array.
for (int i = 0; i < payload.length; i++)
if (i % index == 0)
payload[i] = (byte)index;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
BigObject bigObject = (BigObject)o;
return index == bigObject.index &&
Arrays.equals(payload, bigObject.payload);
}
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(index, payload);
}
}
/**
* @param size Size of data.
* @return Test data.
*/
private int[] createTestData(int size) {
int[] data = new int[size];
for (int d = 0; d < size; ++d)
data[d] = d;
return data;
}
/**
*
*/
private static class LoadRunnable implements IgniteRunnable {
/** */
@IgniteInstanceResource
private Ignite ignite;
/** */
private boolean disableCheckpoints;
/**
* @param disableCheckpoints Disable checkpoints flag.
*/
private LoadRunnable(boolean disableCheckpoints) {
this.disableCheckpoints = disableCheckpoints;
}
/** {@inheritDoc} */
@Override public void run() {
ignite.log().info("Started load.");
if (disableCheckpoints) {
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
.cache().context().database();
try {
dbMgr.enableCheckpoints(false).get();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
try {
boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() {
@Override public boolean applyx() {
return ignite.cache(CACHE_NAME) != null;
}
}, 10_000);
assertTrue(successfulWaiting);
}
catch (IgniteInterruptedCheckedException e) {
throw new RuntimeException(e);
}
IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
IgniteCache<Object, Object> locCache = ignite.cache(LOC_CACHE_NAME);
for (int i = 0; i < ENTRY_COUNT; i++) {
cache.put(i, new IndexedObject(i));
locCache.put(i, new IndexedObject(i));
}
ignite.log().info("Finished load.");
}
}
/**
*
*/
private static class AsyncLoadRunnable implements IgniteRunnable {
/** */
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
@Override public void run() {
try {
boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() {
@Override public boolean applyx() {
return ignite.cache(CACHE_NAME) != null;
}
}, 10_000);
assertTrue(successfulWaiting);
}
catch (IgniteInterruptedCheckedException e) {
throw new RuntimeException(e);
}
ignite.log().info(">>>>>>> Started load.");
for (int i = 0; i < 4; i++) {
ignite.scheduler().callLocal(new Callable<Object>() {
@Override public Object call() {
IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
IgniteCache<Object, Object> locCache = ignite.cache(LOC_CACHE_NAME);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int cnt = 0;
while (!Thread.currentThread().isInterrupted()) {
cache.put(rnd.nextInt(10_000), new IndexedObject(rnd.nextInt()));
locCache.put(rnd.nextInt(10_000), new IndexedObject(rnd.nextInt()));
cnt++;
if (cnt > 0 && cnt % 1_000 == 0)
ignite.log().info(">>>> Updated: " + cnt);
}
return null;
}
});
}
}
}
/**
*
*/
private static class VerifyCallable implements IgniteCallable<Integer> {
/** */
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
@Override public Integer call() throws Exception {
try {
boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() {
@Override public boolean applyx() {
return ignite.cache(CACHE_NAME) != null;
}
}, 10_000);
assertTrue(successfulWaiting);
}
catch (IgniteInterruptedCheckedException e) {
throw new RuntimeException(e);
}
IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
// Partitions are expected to be lost on killed node.
final Collection<Integer> lost = cache.lostPartitions();
if (cache.getConfiguration(CacheConfiguration.class).getAffinity().partitions() != lost.size())
return 1;
ignite.resetLostPartitions(Collections.singleton(CACHE_NAME));
IgniteCache<Object, Object> locCache = ignite.cache(LOC_CACHE_NAME);
for (int i = 0; i < ENTRY_COUNT; i++) {
{
Object val = cache.get(i);
if (val == null) {
ignite.log().warning("Failed to find a value for PARTITIONED cache key: " + i);
return 2;
}
}
{
Object val = locCache.get(i);
if (val == null) {
ignite.log().warning("Failed to find a value for LOCAL cache key: " + i);
return 2;
}
}
}
return 0;
}
}
/**
*
*/
private static class LargeLoadRunnable implements IgniteRunnable {
/** */
@IgniteInstanceResource
private Ignite ignite;
/** */
private boolean disableCheckpoints;
/**
* @param disableCheckpoints Disable checkpoints flag.
*/
private LargeLoadRunnable(boolean disableCheckpoints) {
this.disableCheckpoints = disableCheckpoints;
}
/** {@inheritDoc} */
@Override public void run() {
try {
boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() {
@Override public boolean applyx() {
return ignite.cache(CACHE_NAME) != null;
}
}, 10_000);
assertTrue(successfulWaiting);
}
catch (IgniteInterruptedCheckedException e) {
throw new RuntimeException(e);
}
ignite.log().info("Started load.");
if (disableCheckpoints) {
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
.cache().context().database();
dbMgr.enableCheckpoints(false);
}
IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
IgniteCache<Object, Object> locCache = ignite.cache(LOC_CACHE_NAME);
for (int i = 0; i < LARGE_ENTRY_COUNT; i++) {
final long[] data = new long[LARGE_ARR_SIZE];
Arrays.fill(data, i);
cache.put(i, data);
locCache.put(i, data);
}
ignite.log().info("Finished load.");
}
}
/**
*
*/
private static class AsyncLargeLoadRunnable implements IgniteRunnable {
/** */
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
@Override public void run() {
try {
boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() {
@Override public boolean applyx() {
return ignite.cache(CACHE_NAME) != null;
}
}, 10_000);
assertTrue(successfulWaiting);
}
catch (IgniteInterruptedCheckedException e) {
throw new RuntimeException(e);
}
ignite.log().info(">>>>>>> Started load.");
for (int i = 0; i < 1; i++) {
ignite.scheduler().callLocal(new Callable<Object>() {
@Override public Object call() {
IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int cnt = 0;
while (!Thread.currentThread().isInterrupted()) {
final long[] data = new long[LARGE_ARR_SIZE];
final int key = rnd.nextInt(1000);
Arrays.fill(data, key);
cache.put(key, data);
cnt++;
if (cnt > 0 && cnt % 1_000 == 0)
ignite.log().info(">>>> Updated: " + cnt);
}
return null;
}
});
}
}
}
/**
*
*/
private static class VerifyLargeCallable implements IgniteCallable<Integer> {
/** */
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
@Override public Integer call() throws Exception {
try {
boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() {
@Override public boolean applyx() {
return ignite.cache(CACHE_NAME) != null;
}
}, 10_000);
assertTrue(successfulWaiting);
}
catch (IgniteInterruptedCheckedException e) {
throw new RuntimeException(e);
}
IgniteCache<Object, Object> cache = ignite.cache(CACHE_NAME);
// Partitions are expected to be lost on killed node.
final Collection<Integer> lost = cache.lostPartitions();
if (cache.getConfiguration(CacheConfiguration.class).getAffinity().partitions() != lost.size())
return 1;
ignite.resetLostPartitions(Collections.singleton(CACHE_NAME));
for (int i = 0; i < LARGE_ENTRY_COUNT; i++) {
final long[] data = new long[LARGE_ARR_SIZE];
Arrays.fill(data, i);
final Object val = cache.get(i);
if (val == null) {
ignite.log().warning("Failed to find a value for key: " + i);
return 2;
}
assertTrue(Arrays.equals(data, (long[])val));
}
return 0;
}
}
/**
*
*/
private static class IndexedObject {
/** */
@QuerySqlField(index = true)
private int iVal;
/**
* @param iVal Integer value.
*/
private IndexedObject(int iVal) {
this.iVal = iVal;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (!(o instanceof IndexedObject))
return false;
IndexedObject that = (IndexedObject)o;
return iVal == that.iVal;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return iVal;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IndexedObject.class, this);
}
}
/**
*
*/
private enum EnumVal {
/** */
VAL1,
/** */
VAL2,
/** */
VAL3
}
}