| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.cache.database.db.file; |
| |
| import java.io.File; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteCompute; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cache.CacheAtomicityMode; |
| import org.apache.ignite.cache.CacheRebalanceMode; |
| import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; |
| import org.apache.ignite.cache.query.SqlFieldsQuery; |
| import org.apache.ignite.cache.query.annotations.QuerySqlField; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.configuration.BinaryConfiguration; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.MemoryConfiguration; |
| import org.apache.ignite.configuration.MemoryPolicyConfiguration; |
| import org.apache.ignite.configuration.PersistentStoreConfiguration; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.pagemem.FullPageId; |
| import org.apache.ignite.internal.pagemem.PageUtils; |
| import org.apache.ignite.internal.pagemem.wal.WALIterator; |
| import org.apache.ignite.internal.pagemem.wal.WALPointer; |
| import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; |
| import org.apache.ignite.internal.pagemem.wal.record.WALRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.database.GridCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryEx; |
| import org.apache.ignite.internal.processors.cache.database.tree.io.TrackingPageIO; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.PA; |
| import org.apache.ignite.internal.util.typedef.PAX; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteCallable; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.testframework.GridTestUtils; |
| import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; |
| import org.apache.ignite.testframework.junits.multijvm.IgniteProcessProxy; |
| import org.junit.Assert; |
| import sun.nio.ch.DirectBuffer; |
| |
| import static org.apache.ignite.internal.processors.cache.database.wal.FileWriteAheadLogManager.IGNITE_PDS_WAL_MODE; |
| |
| /** |
| * |
| */ |
| public class IgniteWalRecoverySelfTest extends GridCommonAbstractTest { |
| /** */ |
| private static final String HAS_CACHE = "HAS_CACHE"; |
| |
| /** */ |
| private static final int LARGE_ARR_SIZE = 1025; |
| |
| /** */ |
| private boolean fork; |
| |
| /** */ |
| private String cacheName; |
| |
| /** */ |
| private int walSegmentSize; |
| |
| /** {@inheritDoc} */ |
| @Override protected boolean isMultiJvm() { |
| return fork; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { |
| IgniteConfiguration cfg = super.getConfiguration(gridName); |
| |
| CacheConfiguration<Integer, IndexedObject> ccfg = new CacheConfiguration<>(cacheName); |
| |
| ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); |
| ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); |
| ccfg.setAffinity(new RendezvousAffinityFunction(false, 32)); |
| ccfg.setNodeFilter(new RemoteNodeFilter()); |
| ccfg.setIndexedTypes(Integer.class, IndexedObject.class); |
| |
| cfg.setCacheConfiguration(ccfg); |
| |
| MemoryConfiguration dbCfg = new MemoryConfiguration(); |
| |
| dbCfg.setPageSize(4 * 1024); |
| |
| MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration(); |
| |
| memPlcCfg.setName("dfltMemPlc"); |
| memPlcCfg.setInitialSize(1024 * 1024 * 1024); |
| memPlcCfg.setMaxSize(1024 * 1024 * 1024); |
| |
| dbCfg.setMemoryPolicies(memPlcCfg); |
| dbCfg.setDefaultMemoryPolicyName("dfltMemPlc"); |
| |
| cfg.setMemoryConfiguration(dbCfg); |
| |
| PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration(); |
| |
| if (walSegmentSize != 0) |
| pCfg.setWalSegmentSize(walSegmentSize); |
| |
| cfg.setPersistentStoreConfiguration(pCfg); |
| |
| cfg.setMarshaller(null); |
| |
| BinaryConfiguration binCfg = new BinaryConfiguration(); |
| |
| binCfg.setCompactFooter(false); |
| |
| cfg.setBinaryConfiguration(binCfg); |
| |
| if (!getTestIgniteInstanceName(0).equals(gridName)) |
| cfg.setUserAttributes(F.asMap(HAS_CACHE, true)); |
| |
| return cfg; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void beforeTest() throws Exception { |
| stopAllGrids(); |
| |
| deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); |
| |
| cacheName = "partitioned"; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void afterTest() throws Exception { |
| stopAllGrids(); |
| |
| deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| public void testWalBig() throws Exception { |
| try { |
| IgniteEx ignite = startGrid(1); |
| |
| IgniteCache<Object, Object> cache = ignite.cache("partitioned"); |
| |
| Random rnd = new Random(); |
| |
| Map<Integer, IndexedObject> map = new HashMap<>(); |
| |
| for (int i = 0; i < 10_000; i++) { |
| if (i % 1000 == 0) |
| X.println(" >> " + i); |
| |
| int k = rnd.nextInt(300_000); |
| IndexedObject v = new IndexedObject(rnd.nextInt(10_000)); |
| |
| cache.put(k, v); |
| map.put(k, v); |
| } |
| |
| // Check. |
| for (Integer k : map.keySet()) |
| assertEquals(map.get(k), cache.get(k)); |
| |
| stopGrid(1); |
| |
| ignite = startGrid(1); |
| |
| cache = ignite.cache("partitioned"); |
| |
| // Check. |
| for (Integer k : map.keySet()) |
| assertEquals(map.get(k), cache.get(k)); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| public void testSwitchClassLoader() throws Exception { |
| try { |
| final IgniteEx igniteEx = startGrid(1); |
| |
| // CustomDiscoveryMessage will trigger service tasks |
| startGrid(2); |
| |
| IgniteCache<Integer, EnumVal> cache = igniteEx.cache("partitioned"); |
| |
| // Creates LoadCacheJobV2 |
| // cache.loadCache(null); |
| |
| final ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); |
| final ClassLoader newCl = getExternalClassLoader(); |
| |
| Thread.currentThread().setContextClassLoader(newCl); |
| |
| for (int i = 0; i < 10; i++) |
| cache.put(i, i % 2 == 0 ? EnumVal.VAL1 : EnumVal.VAL2); |
| |
| for (int i = 0; i < 10; i++) |
| assert cache.containsKey(i); |
| |
| // Invokes ClearTask with new class loader |
| cache.clear(); |
| |
| Thread.currentThread().setContextClassLoader(oldCl); |
| |
| for (int i = 0; i < 10; i++) |
| cache.put(i, i % 2 == 0 ? EnumVal.VAL1 : EnumVal.VAL2); |
| |
| for (int i = 0; i < 10; i++) |
| assert cache.containsKey(i); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| public void testWalSimple() throws Exception { |
| try { |
| IgniteEx ignite = startGrid(1); |
| |
| IgniteCache<Object, Object> cache = ignite.cache("partitioned"); |
| |
| info(" --> step1"); |
| |
| for (int i = 0; i < 10_000; i += 2) { |
| // X.println(" -> put: " + i); |
| |
| cache.put(i, new IndexedObject(i)); |
| } |
| |
| info(" --> step2"); |
| |
| for (int i = 0; i < 10_000; i += 3) |
| cache.put(i, new IndexedObject(i * 2)); |
| |
| info(" --> step3"); |
| |
| for (int i = 0; i < 10_000; i += 7) |
| cache.put(i, new IndexedObject(i * 3)); |
| |
| info(" --> check1"); |
| |
| // Check. |
| for (int i = 0; i < 10_000; i++) { |
| IndexedObject o; |
| |
| if (i % 7 == 0) |
| o = new IndexedObject(i * 3); |
| else if (i % 3 == 0) |
| o = new IndexedObject(i * 2); |
| else if (i % 2 == 0) |
| o = new IndexedObject(i); |
| else |
| o = null; |
| |
| assertEquals(o, cache.get(i)); |
| } |
| |
| stopGrid(1); |
| |
| ignite = startGrid(1); |
| |
| cache = ignite.cache("partitioned"); |
| |
| info(" --> check2"); |
| |
| // Check. |
| for (int i = 0; i < 10_000; i++) { |
| IndexedObject o; |
| |
| if (i % 7 == 0) |
| o = new IndexedObject(i * 3); |
| else if (i % 3 == 0) |
| o = new IndexedObject(i * 2); |
| else if (i % 2 == 0) |
| o = new IndexedObject(i); |
| else |
| o = null; |
| |
| assertEquals(o, cache.get(i)); |
| } |
| |
| info(" --> ok"); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| public void testWalLargeValue() throws Exception { |
| try { |
| IgniteEx ignite = startGrid(1); |
| |
| IgniteCache<Object, Object> cache = ignite.cache("partitioned"); |
| |
| for (int i = 0; i < 10_000; i++) { |
| final byte[] data = new byte[i]; |
| |
| Arrays.fill(data, (byte)i); |
| |
| cache.put(i, data); |
| |
| if (i % 1000 == 0) |
| X.println(" ---> put: " + i); |
| |
| // Assert.assertArrayEquals(data, (byte[])cache.get(i)); |
| } |
| |
| // info(" --> check1"); |
| // |
| // for (int i = 0; i < 25_000; i++) { |
| // final byte[] data = new byte[i]; |
| // |
| // Arrays.fill(data, (byte)i); |
| // |
| // final byte[] loaded = (byte[]) cache.get(i); |
| // |
| // Assert.assertArrayEquals(data, loaded); |
| // } |
| |
| stopGrid(1); |
| |
| ignite = startGrid(1); |
| |
| cache = ignite.cache("partitioned"); |
| |
| info(" --> check2"); |
| |
| for (int i = 0; i < 10_000; i++) { |
| final byte[] data = new byte[i]; |
| |
| Arrays.fill(data, (byte)i); |
| |
| final byte[] loaded = (byte[]) cache.get(i); |
| |
| Assert.assertArrayEquals(data, loaded); |
| |
| if (i % 1000 == 0) |
| X.println(" ---> get: " + i); |
| } |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| public void testWalRolloverMultithreadedDefault() throws Exception { |
| checkWalRolloverMultithreaded(false); |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| public void testWalRolloverMultithreadedLogOnly() throws Exception { |
| checkWalRolloverMultithreaded(true); |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| public void testHugeCheckpointRecord() throws Exception { |
| try { |
| System.setProperty(IGNITE_PDS_WAL_MODE, "LOG_ONLY"); |
| |
| final IgniteEx ignite = startGrid(1); |
| |
| for (int i = 0; i < 50; i++) { |
| CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>("cache-" + i); |
| |
| // We can get 'too many open files' with default number of partitions. |
| ccfg.setAffinity(new RendezvousAffinityFunction(false, 128)); |
| |
| IgniteCache<Object, Object> cache = ignite.getOrCreateCache(ccfg); |
| |
| cache.put(i, i); |
| } |
| |
| final long endTime = System.currentTimeMillis() + 30_000; |
| |
| IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| Random rnd = ThreadLocalRandom.current(); |
| |
| while (U.currentTimeMillis() < endTime) { |
| IgniteCache<Object, Object> cache = ignite.cache("cache-" + rnd.nextInt(50)); |
| |
| cache.put(rnd.nextInt(50_000), rnd.nextInt()); |
| } |
| |
| return null; |
| } |
| }, 16, "put-thread"); |
| |
| while (System.currentTimeMillis() < endTime) { |
| ignite.context().cache().context().database().wakeupForCheckpoint("test").get(); |
| |
| U.sleep(500); |
| } |
| |
| fut.get(); |
| } |
| finally { |
| System.clearProperty(IGNITE_PDS_WAL_MODE); |
| |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| private void checkWalRolloverMultithreaded(boolean logOnly) throws Exception { |
| if (logOnly) |
| System.setProperty(IGNITE_PDS_WAL_MODE, "LOG_ONLY"); |
| |
| walSegmentSize = 2 * 1024 * 1024; |
| |
| final long endTime = System.currentTimeMillis() + 3 * 60 * 1000; |
| |
| try { |
| IgniteEx ignite = startGrid(1); |
| |
| final IgniteCache<Object, Object> cache = ignite.cache("partitioned"); |
| |
| GridTestUtils.runMultiThreaded(new Callable<Void>() { |
| @Override public Void call() throws Exception { |
| Random rnd = ThreadLocalRandom.current(); |
| |
| while (U.currentTimeMillis() < endTime) |
| cache.put(rnd.nextInt(50_000), rnd.nextInt()); |
| |
| return null; |
| } |
| }, 16, "put-thread"); |
| } |
| finally { |
| System.clearProperty(IGNITE_PDS_WAL_MODE); |
| |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| public void testWalRenameDirSimple() throws Exception { |
| try { |
| IgniteEx ignite = startGrid(1); |
| |
| IgniteCache<Object, Object> cache = ignite.cache("partitioned"); |
| |
| for (int i = 0; i < 100; i++) |
| cache.put(i, new IndexedObject(i)); |
| |
| stopGrid(1); |
| |
| final File cacheDir = cacheDir("partitioned", ignite.context().discovery().consistentId().toString()); |
| |
| final boolean renamed = cacheDir.renameTo(new File(cacheDir.getParent(), "cache-partitioned0")); |
| |
| assert renamed; |
| |
| cacheName = "partitioned0"; |
| |
| ignite = startGrid(1); |
| |
| cache = ignite.cache(cacheName); |
| |
| for (int i = 0; i < 100; i++) |
| assertEquals(new IndexedObject(i), cache.get(i)); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @param consId Consistent ID. |
| * @return Cache dir. |
| * @throws IgniteCheckedException If fail. |
| */ |
| private File cacheDir(final String cacheName, String consId) throws IgniteCheckedException { |
| consId = consId.replaceAll("[\\.:]", "_"); |
| |
| final File dbDir = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false); |
| |
| assert dbDir.exists(); |
| |
| final File consIdDir = new File(dbDir.getAbsolutePath(), consId); |
| |
| assert consIdDir.exists(); |
| |
| final File cacheDir = new File(consIdDir.getAbsolutePath(), "cache-" + cacheName); |
| |
| assert cacheDir.exists(); |
| |
| return cacheDir; |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| public void testRecoveryNoCheckpoint() throws Exception { |
| try { |
| IgniteEx ctrlGrid = startGrid(0); |
| |
| fork = true; |
| |
| IgniteEx cacheGrid = startGrid(1); |
| |
| ctrlGrid.compute(ctrlGrid.cluster().forRemotes()).run(new LoadRunnable(false)); |
| |
| info("Killing remote process..."); |
| |
| ((IgniteProcessProxy)cacheGrid).kill(); |
| |
| final IgniteEx g0 = ctrlGrid; |
| |
| GridTestUtils.waitForCondition(new PA() { |
| /** {@inheritDoc} */ |
| @Override public boolean apply() { |
| return g0.cluster().nodes().size() == 1; |
| } |
| }, getTestTimeout()); |
| |
| fork = false; |
| |
| // Now start the grid and verify that updates were restored from WAL. |
| cacheGrid = startGrid(1); |
| |
| IgniteCache<Object, Object> cache = cacheGrid.cache("partitioned"); |
| |
| for (int i = 0; i < 10_000; i++) |
| assertEquals(new IndexedObject(i), cache.get(i)); |
| |
| List<List<?>> res = cache.query(new SqlFieldsQuery("select count(iVal) from IndexedObject")).getAll(); |
| |
| assertEquals(1, res.size()); |
| assertEquals(10_000L, res.get(0).get(0)); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| public void testRecoveryLargeNoCheckpoint() throws Exception { |
| try { |
| IgniteEx ctrlGrid = startGrid(0); |
| |
| fork = true; |
| |
| IgniteEx cacheGrid = startGrid(1); |
| |
| ctrlGrid.compute(ctrlGrid.cluster().forRemotes()).run(new LargeLoadRunnable(false)); |
| |
| info("Killing remote process..."); |
| |
| ((IgniteProcessProxy)cacheGrid).kill(); |
| |
| final IgniteEx g0 = ctrlGrid; |
| |
| GridTestUtils.waitForCondition(new PA() { |
| /** {@inheritDoc} */ |
| @Override public boolean apply() { |
| return g0.cluster().nodes().size() == 1; |
| } |
| }, getTestTimeout()); |
| |
| fork = false; |
| |
| // Now start the grid and verify that updates were restored from WAL. |
| cacheGrid = startGrid(1); |
| |
| IgniteCache<Object, Object> cache = cacheGrid.cache("partitioned"); |
| |
| for (int i = 0; i < 1000; i++) { |
| final long[] data = new long[LARGE_ARR_SIZE]; |
| |
| Arrays.fill(data, i); |
| |
| final long[] loaded = (long[]) cache.get(i); |
| |
| Assert.assertArrayEquals(data, loaded); |
| } |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected long getTestTimeout() { |
| return TimeUnit.MINUTES.toMillis(20); |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| public void testRandomCrash() throws Exception { |
| try { |
| IgniteEx ctrlGrid = startGrid(0); |
| |
| fork = true; |
| |
| IgniteEx cacheGrid = startGrid(1); |
| |
| IgniteCompute rmt = ctrlGrid.compute(ctrlGrid.cluster().forRemotes()); |
| |
| rmt.run(new LoadRunnable(false)); |
| |
| info(">>> Finished cache population."); |
| |
| rmt.run(new AsyncLoadRunnable()); |
| |
| Thread.sleep(20_000); |
| |
| info(">>> Killing remote process..."); |
| |
| ((IgniteProcessProxy)cacheGrid).kill(); |
| |
| startGrid(1); |
| |
| Boolean res = rmt.call(new VerifyCallable()); |
| |
| assertTrue(res); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| public void testLargeRandomCrash() throws Exception { |
| try { |
| IgniteEx ctrlGrid = startGrid(0); |
| |
| fork = true; |
| |
| IgniteEx cacheGrid = startGrid(1); |
| |
| IgniteCompute rmt = ctrlGrid.compute(ctrlGrid.cluster().forRemotes()); |
| |
| rmt.run(new LargeLoadRunnable(false)); |
| |
| info(">>> Finished cache population."); |
| |
| rmt.run(new AsyncLargeLoadRunnable()); |
| |
| Thread.sleep(20_000); |
| |
| info(">>> Killing remote process..."); |
| |
| ((IgniteProcessProxy)cacheGrid).kill(); |
| |
| startGrid(1); |
| |
| Boolean res = rmt.call(new VerifyLargeCallable()); |
| |
| assertTrue(res); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class RemoteNodeFilter implements IgnitePredicate<ClusterNode> { |
| /** {@inheritDoc} */ |
| @Override public boolean apply(ClusterNode clusterNode) { |
| return clusterNode.attribute(HAS_CACHE) != null; |
| } |
| } |
| |
| /** |
| * @throws Exception If failed. |
| */ |
| public void testDestroyCache() throws Exception { |
| try { |
| IgniteEx ignite = startGrid(1); |
| |
| IgniteCache<Object, Object> cache = ignite.getOrCreateCache("test"); |
| |
| cache.put(1, new IndexedObject(1)); |
| |
| ignite.destroyCache("test"); |
| |
| cache = ignite.getOrCreateCache("test"); |
| |
| // No entry available after cache destroy. |
| assertNull(cache.get(1)); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception If fail. |
| */ |
| public void testEvictPartition() throws Exception { |
| try { |
| Ignite ignite1 = startGrid("node1"); |
| |
| IgniteCache<Object, Object> cache1 = ignite1.cache(cacheName); |
| |
| for (int i = 0; i < 100; i++) |
| cache1.put(i, new IndexedObject(i)); |
| |
| Ignite ignite2 = startGrid("node2"); |
| |
| IgniteCache<Object, Object> cache2 = ignite2.cache(cacheName); |
| |
| for (int i = 0; i < 100; i++) { |
| assertEquals(new IndexedObject(i), cache1.get(i)); |
| assertEquals(new IndexedObject(i), cache2.get(i)); |
| } |
| |
| ignite1.close(); |
| ignite2.close(); |
| |
| ignite1 = startGrid("node1"); |
| ignite2 = startGrid("node2"); |
| |
| cache1 = ignite1.cache(cacheName); |
| cache2 = ignite2.cache(cacheName); |
| |
| for (int i = 0; i < 100; i++) { |
| assertEquals(new IndexedObject(i), cache1.get(i)); |
| assertEquals(new IndexedObject(i), cache2.get(i)); |
| } |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * @throws Exception if failed. |
| */ |
| public void testApplyDeltaRecords() throws Exception { |
| try { |
| IgniteEx ignite0 = (IgniteEx)startGrid("node0"); |
| |
| IgniteCache<Object, Object> cache0 = ignite0.cache(cacheName); |
| |
| for (int i = 0; i < 1000; i++) |
| cache0.put(i, new IndexedObject(i)); |
| |
| GridCacheSharedContext<Object, Object> sharedCtx = ignite0.context().cache().context(); |
| |
| GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database(); |
| |
| db.waitForCheckpoint("test"); |
| db.enableCheckpoints(false).get(); |
| |
| // Log something to know where to start. |
| WALPointer ptr = sharedCtx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis())); |
| |
| info("Replay marker: " + ptr); |
| |
| for (int i = 1000; i < 5000; i++) |
| cache0.put(i, new IndexedObject(i)); |
| |
| info("Done puts..."); |
| |
| for (int i = 2_000; i < 3_000; i++) |
| cache0.remove(i); |
| |
| info("Done removes..."); |
| |
| for (int i = 5000; i < 6000; i++) |
| cache0.put(i, new IndexedObject(i)); |
| |
| info("Done puts..."); |
| |
| Map<FullPageId, byte[]> rolledPages = new HashMap<>(); |
| |
| int pageSize = sharedCtx.database().pageSize(); |
| |
| ByteBuffer buf1 = ByteBuffer.allocateDirect(pageSize); |
| |
| // Now check that deltas can be correctly applied. |
| try (WALIterator it = sharedCtx.wal().replay(ptr)) { |
| while (it.hasNext()) { |
| IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); |
| |
| WALRecord rec = tup.get2(); |
| |
| if (rec instanceof PageSnapshot) { |
| PageSnapshot page = (PageSnapshot)rec; |
| |
| rolledPages.put(page.fullPageId(), page.pageData()); |
| } |
| else if (rec instanceof PageDeltaRecord) { |
| PageDeltaRecord delta = (PageDeltaRecord)rec; |
| |
| FullPageId fullId = new FullPageId(delta.pageId(), delta.cacheId()); |
| |
| byte[] pageData = rolledPages.get(fullId); |
| |
| if (pageData == null) { |
| pageData = new byte[pageSize]; |
| |
| rolledPages.put(fullId, pageData); |
| } |
| |
| assertNotNull("Missing page snapshot [page=" + fullId + ", delta=" + delta + ']', pageData); |
| |
| buf1.order(ByteOrder.nativeOrder()); |
| |
| buf1.position(0); |
| buf1.put(pageData); |
| buf1.position(0); |
| |
| delta.applyDelta(sharedCtx |
| .database() |
| .memoryPolicy(null) |
| .pageMemory(), |
| |
| ((DirectBuffer)buf1).address()); |
| |
| buf1.position(0); |
| |
| buf1.get(pageData); |
| } |
| } |
| } |
| |
| info("Done apply..."); |
| |
| PageMemoryEx pageMem = (PageMemoryEx)db.memoryPolicy(null).pageMemory(); |
| |
| for (Map.Entry<FullPageId, byte[]> entry : rolledPages.entrySet()) { |
| FullPageId fullId = entry.getKey(); |
| |
| ignite0.context().cache().context().database().checkpointReadLock(); |
| |
| try { |
| long page = pageMem.acquirePage(fullId.cacheId(), fullId.pageId(), true); |
| |
| try { |
| long buf = pageMem.writeLock(fullId.cacheId(), fullId.pageId(), page, true); |
| |
| try { |
| byte[] data = entry.getValue(); |
| |
| for (int i = 0; i < data.length; i++) { |
| if (fullId.pageId() == TrackingPageIO.VERSIONS.latest().trackingPageFor(fullId.pageId(), db.pageSize())) |
| continue; // Skip tracking pages. |
| |
| assertEquals("page=" + fullId + ", pos=" + i, PageUtils.getByte(buf, i), data[i]); |
| } |
| } |
| finally { |
| pageMem.writeUnlock(fullId.cacheId(), fullId.pageId(), page, null, false, true); |
| } |
| } |
| finally { |
| pageMem.releasePage(fullId.cacheId(), fullId.pageId(), page); |
| } |
| } |
| finally { |
| ignite0.context().cache().context().database().checkpointReadUnlock(); |
| } |
| } |
| |
| ignite0.close(); |
| } |
| finally { |
| stopAllGrids(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class LoadRunnable implements IgniteRunnable { |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** */ |
| private boolean disableCheckpoints; |
| |
| /** |
| * @param disableCheckpoints Disable checkpoints flag. |
| */ |
| private LoadRunnable(boolean disableCheckpoints) { |
| this.disableCheckpoints = disableCheckpoints; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| ignite.log().info("Started load."); |
| |
| if (disableCheckpoints) { |
| GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() |
| .cache().context().database(); |
| |
| try { |
| dbMgr.enableCheckpoints(false).get(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| try { |
| boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() { |
| @Override public boolean applyx() { |
| return ignite.cache("partitioned") != null; |
| } |
| }, 10_000); |
| |
| assertTrue(successfulWaiting); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new RuntimeException(e); |
| } |
| |
| IgniteCache<Object, Object> cache = ignite.cache("partitioned"); |
| |
| for (int i = 0; i < 10_000; i++) |
| cache.put(i, new IndexedObject(i)); |
| |
| ignite.log().info("Finished load."); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class AsyncLoadRunnable implements IgniteRunnable { |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| try { |
| boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() { |
| @Override public boolean applyx() { |
| return ignite.cache("partitioned") != null; |
| } |
| }, 10_000); |
| |
| assertTrue(successfulWaiting); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new RuntimeException(e); |
| } |
| |
| ignite.log().info(">>>>>>> Started load."); |
| |
| for (int i = 0; i < 4; i++) { |
| ignite.scheduler().callLocal(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| IgniteCache<Object, Object> cache = ignite.cache("partitioned"); |
| |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| int cnt = 0; |
| |
| while (!Thread.currentThread().isInterrupted()) { |
| cache.put(rnd.nextInt(10_000), new IndexedObject(rnd.nextInt())); |
| |
| cnt++; |
| |
| if (cnt > 0 && cnt % 1_000 == 0) |
| ignite.log().info(">>>> Updated: " + cnt); |
| } |
| |
| return null; |
| } |
| }); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class VerifyCallable implements IgniteCallable<Boolean> { |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** {@inheritDoc} */ |
| @Override public Boolean call() throws Exception { |
| try { |
| boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() { |
| @Override public boolean applyx() { |
| return ignite.cache("partitioned") != null; |
| } |
| }, 10_000); |
| |
| assertTrue(successfulWaiting); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new RuntimeException(e); |
| } |
| |
| IgniteCache<Object, Object> cache = ignite.cache("partitioned"); |
| |
| for (int i = 0; i < 10_000; i++) { |
| Object val = cache.get(i); |
| |
| if (val == null) { |
| ignite.log().warning("Failed to find a value for key: " + i); |
| |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class LargeLoadRunnable implements IgniteRunnable { |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** */ |
| private boolean disableCheckpoints; |
| |
| /** |
| * @param disableCheckpoints Disable checkpoints flag. |
| */ |
| private LargeLoadRunnable(boolean disableCheckpoints) { |
| this.disableCheckpoints = disableCheckpoints; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| try { |
| boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() { |
| @Override public boolean applyx() { |
| return ignite.cache("partitioned") != null; |
| } |
| }, 10_000); |
| |
| assertTrue(successfulWaiting); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new RuntimeException(e); |
| } |
| |
| ignite.log().info("Started load."); |
| |
| if (disableCheckpoints) { |
| GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context() |
| .cache().context().database(); |
| |
| dbMgr.enableCheckpoints(false); |
| } |
| |
| IgniteCache<Object, Object> cache = ignite.cache("partitioned"); |
| |
| for (int i = 0; i < 1000; i++) { |
| final long[] data = new long[LARGE_ARR_SIZE]; |
| |
| Arrays.fill(data, i); |
| |
| cache.put(i, data); |
| } |
| |
| ignite.log().info("Finished load."); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class AsyncLargeLoadRunnable implements IgniteRunnable { |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| try { |
| boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() { |
| @Override public boolean applyx() { |
| return ignite.cache("partitioned") != null; |
| } |
| }, 10_000); |
| |
| assertTrue(successfulWaiting); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new RuntimeException(e); |
| } |
| |
| ignite.log().info(">>>>>>> Started load."); |
| |
| for (int i = 0; i < 1; i++) { |
| ignite.scheduler().callLocal(new Callable<Object>() { |
| @Override public Object call() throws Exception { |
| IgniteCache<Object, Object> cache = ignite.cache("partitioned"); |
| |
| ThreadLocalRandom rnd = ThreadLocalRandom.current(); |
| |
| int cnt = 0; |
| |
| while (!Thread.currentThread().isInterrupted()) { |
| final long[] data = new long[LARGE_ARR_SIZE]; |
| |
| final int key = rnd.nextInt(1000); |
| |
| Arrays.fill(data, key); |
| |
| // System.out.println("> " + key); |
| |
| cache.put(key, data); |
| |
| cnt++; |
| |
| if (cnt > 0 && cnt % 1_000 == 0) |
| ignite.log().info(">>>> Updated: " + cnt); |
| } |
| |
| return null; |
| } |
| }); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class VerifyLargeCallable implements IgniteCallable<Boolean> { |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** {@inheritDoc} */ |
| @Override public Boolean call() throws Exception { |
| try { |
| boolean successfulWaiting = GridTestUtils.waitForCondition(new PAX() { |
| @Override public boolean applyx() { |
| return ignite.cache("partitioned") != null; |
| } |
| }, 10_000); |
| |
| assertTrue(successfulWaiting); |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| throw new RuntimeException(e); |
| } |
| |
| IgniteCache<Object, Object> cache = ignite.cache("partitioned"); |
| |
| for (int i = 0; i < 1000; i++) { |
| final long[] data = new long[LARGE_ARR_SIZE]; |
| |
| Arrays.fill(data, i); |
| |
| final Object val = cache.get(i); |
| |
| if (val == null) { |
| ignite.log().warning("Failed to find a value for key: " + i); |
| |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| } |
| |
| |
| /** |
| * |
| */ |
| private static class IndexedObject { |
| /** */ |
| @QuerySqlField(index = true) |
| private int iVal; |
| |
| /** |
| * @param iVal Integer value. |
| */ |
| private IndexedObject(int iVal) { |
| this.iVal = iVal; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| |
| if (!(o instanceof IndexedObject)) |
| return false; |
| |
| IndexedObject that = (IndexedObject)o; |
| |
| return iVal == that.iVal; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return iVal; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(IndexedObject.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private enum EnumVal { |
| /** */ |
| VAL1, |
| |
| /** */ |
| VAL2, |
| |
| /** */ |
| VAL3 |
| } |
| } |