blob: 6b3cf37e261e0505dc10c0376c9cb85ba55337dd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.db.wal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
import org.apache.ignite.internal.processors.cache.persistence.freelist.PagesList;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.junit.Assert;
import org.junit.Test;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
/**
*
*/
public class WalRecoveryTxLogicalRecordsTest extends GridCommonAbstractTest {
/** Cache name. */
private static final String CACHE_NAME = "cache";
/** Cache 2 name. */
private static final String CACHE2_NAME = "cache2";
/** */
public static final int PARTS = 32;
/** */
public static final int WAL_HIST_SIZE = 30;
/** */
private int pageSize = 4 * 1024;
/** */
private CacheConfiguration<?, ?> extraCcfg;
/** */
private Long checkpointFreq;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
CacheConfiguration<Integer, IndexedValue> ccfg = new CacheConfiguration<>(CACHE_NAME);
ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
ccfg.setAffinity(new RendezvousAffinityFunction(false, PARTS));
ccfg.setIndexedTypes(Integer.class, IndexedValue.class);
if (extraCcfg != null)
cfg.setCacheConfiguration(ccfg, new CacheConfiguration<>(extraCcfg));
else
cfg.setCacheConfiguration(ccfg);
DataStorageConfiguration dbCfg = new DataStorageConfiguration();
dbCfg.setPageSize(pageSize);
dbCfg.setWalHistorySize(WAL_HIST_SIZE);
dbCfg.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setMaxSize(100L * 1024 * 1024)
.setPersistenceEnabled(true));
if (checkpointFreq != null)
dbCfg.setCheckpointFrequency(checkpointFreq);
cfg.setDataStorageConfiguration(dbCfg);
cfg.setMarshaller(null);
BinaryConfiguration binCfg = new BinaryConfiguration();
binCfg.setCompactFooter(false);
cfg.setBinaryConfiguration(binCfg);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
cleanPersistenceDir();
}
/**
* @throws Exception If failed.
*/
@Test
public void testWalTxSimple() throws Exception {
Ignite ignite = startGrid();
ignite.cluster().active(true);
try {
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
.cache().context().database();
dbMgr.enableCheckpoints(false).get();
IgniteCache<Integer, IndexedValue> cache = ignite.cache(CACHE_NAME);
int txCnt = 100;
int keysPerTx = 10;
for (int i = 0; i < txCnt; i++) {
try (Transaction tx = ignite.transactions().txStart()) {
for (int j = 0; j < keysPerTx; j++) {
int k = i * keysPerTx + j;
cache.put(k, new IndexedValue(k));
}
tx.commit();
}
}
for (int i = 0; i < txCnt; i++) {
for (int j = 0; j < keysPerTx; j++) {
int k = i * keysPerTx + j;
assertEquals(k, cache.get(k).value());
}
}
stopGrid();
ignite = startGrid();
ignite.cluster().active(true);
cache = ignite.cache(CACHE_NAME);
for (int i = 0; i < txCnt; i++) {
for (int j = 0; j < keysPerTx; j++) {
int k = i * keysPerTx + j;
assertEquals(k, cache.get(k).value());
}
}
for (int i = 0; i < txCnt; i++) {
for (int j = 0; j < keysPerTx; j++) {
int k = i * keysPerTx + j;
QueryCursor<List<?>> cur = cache.query(
new SqlFieldsQuery("select sVal from IndexedValue where iVal=?").setArgs(k));
List<List<?>> vals = cur.getAll();
assertEquals(vals.size(), 1);
assertEquals("string-" + k, vals.get(0).get(0));
}
}
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception if failed.
*/
@Test
public void testWalRecoveryRemoves() throws Exception {
Ignite ignite = startGrid();
ignite.cluster().active(true);
try {
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
.cache().context().database();
IgniteCache<Integer, IndexedValue> cache = ignite.cache(CACHE_NAME);
int txCnt = 100;
int keysPerTx = 10;
for (int i = 0; i < txCnt; i++) {
try (Transaction tx = ignite.transactions().txStart()) {
for (int j = 0; j < keysPerTx; j++) {
int k = i * keysPerTx + j;
cache.put(k, new IndexedValue(k));
}
tx.commit();
}
}
for (int i = 0; i < txCnt; i++) {
for (int j = 0; j < keysPerTx; j++) {
int k = i * keysPerTx + j;
assertEquals(k, cache.get(k).value());
}
}
dbMgr.waitForCheckpoint("test");
dbMgr.enableCheckpoints(false).get();
for (int i = 0; i < txCnt / 2; i++) {
try (Transaction tx = ignite.transactions().txStart()) {
for (int j = 0; j < keysPerTx; j++) {
int k = i * keysPerTx + j;
cache.remove(k);
}
tx.commit();
}
}
stopGrid();
ignite = startGrid();
ignite.cluster().active(true);
cache = ignite.cache(CACHE_NAME);
for (int i = 0; i < txCnt; i++) {
for (int j = 0; j < keysPerTx; j++) {
int k = i * keysPerTx + j;
QueryCursor<List<?>> cur = cache.query(
new SqlFieldsQuery("select sVal from IndexedValue where iVal=?").setArgs(k));
List<List<?>> vals = cur.getAll();
if (i < txCnt / 2) {
assertNull(cache.get(k));
assertTrue(F.isEmpty(vals));
}
else {
assertEquals(k, cache.get(k).value());
assertEquals(1, vals.size());
assertEquals("string-" + k, vals.get(0).get(0));
}
}
}
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception if failed.
*/
@Test
public void testHistoricalRebalanceIterator() throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
extraCcfg = new CacheConfiguration(CACHE_NAME + "2");
extraCcfg.setAffinity(new RendezvousAffinityFunction(false, PARTS));
Ignite ignite = startGrid();
try {
ignite.cluster().active(true);
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
.cache().context().database();
dbMgr.waitForCheckpoint("test");
// This number depends on wal history size.
int entries = 25;
IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
IgniteCache<Integer, Integer> cache2 = ignite.cache(CACHE_NAME + "2");
for (int i = 0; i < entries; i++) {
// Put to partition 0.
cache.put(i * PARTS, i * PARTS);
// Put to partition 1.
cache.put(i * PARTS + 1, i * PARTS + 1);
// Put to another cache.
cache2.put(i, i);
dbMgr.waitForCheckpoint("test");
}
for (int i = 0; i < entries; i++) {
assertEquals((Integer)(i * PARTS), cache.get(i * PARTS));
assertEquals((Integer)(i * PARTS + 1), cache.get(i * PARTS + 1));
assertEquals((Integer)(i), cache2.get(i));
}
CacheGroupContext grp = ((IgniteEx)ignite).context().cache().cacheGroup(CU.cacheId(CACHE_NAME));
IgniteCacheOffheapManager offh = grp.offheap();
AffinityTopologyVersion topVer = grp.affinity().lastVersion();
IgniteDhtDemandedPartitionsMap map;
for (int i = 0; i < entries; i++) {
map = new IgniteDhtDemandedPartitionsMap();
map.addHistorical(0, i, entries, PARTS);
WALPointer ptr = reserveWalPointerForIterator(grp.shared());
try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
assertNotNull(it);
assertTrue("Not historical for iteration: " + i, it.historical(0));
for (int j = i; j < entries; j++) {
assertTrue("i=" + i + ", j=" + j, it.hasNextX());
CacheDataRow row = it.next();
assertEquals(j * PARTS, (int)row.key().value(grp.cacheObjectContext(), false));
assertEquals(j * PARTS, (int)row.value().value(grp.cacheObjectContext(), false));
}
assertFalse(it.hasNext());
}
finally {
releaseWalPointerForIterator(grp.shared(), ptr);
}
map = new IgniteDhtDemandedPartitionsMap();
map.addHistorical(1, i, entries, PARTS);
ptr = reserveWalPointerForIterator(grp.shared());
try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
assertNotNull(it);
assertTrue("Not historical for iteration: " + i, it.historical(1));
for (int j = i; j < entries; j++) {
assertTrue(it.hasNextX());
CacheDataRow row = it.next();
assertEquals(j * PARTS + 1, (int)row.key().value(grp.cacheObjectContext(), false));
assertEquals(j * PARTS + 1, (int)row.value().value(grp.cacheObjectContext(), false));
}
assertFalse(it.hasNext());
}
finally {
releaseWalPointerForIterator(grp.shared(), ptr);
}
}
stopAllGrids();
// Check that iterator is valid after restart.
ignite = startGrid();
ignite.cluster().active(true);
grp = ((IgniteEx)ignite).context().cache().cacheGroup(CU.cacheId(CACHE_NAME));
offh = grp.offheap();
topVer = grp.affinity().lastVersion();
for (int i = 0; i < entries; i++) {
long start = System.currentTimeMillis();
map = new IgniteDhtDemandedPartitionsMap();
map.addHistorical(0, i, entries, PARTS);
WALPointer ptr = reserveWalPointerForIterator(grp.shared());
try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
long end = System.currentTimeMillis();
info("Time to get iterator: " + (end - start));
assertTrue("Not historical for iteration: " + i, it.historical(0));
assertNotNull(it);
start = System.currentTimeMillis();
for (int j = i; j < entries; j++) {
assertTrue("i=" + i + ", j=" + j, it.hasNextX());
CacheDataRow row = it.next();
assertEquals(j * PARTS, (int)row.key().value(grp.cacheObjectContext(), false));
assertEquals(j * PARTS, (int)row.value().value(grp.cacheObjectContext(), false));
}
end = System.currentTimeMillis();
info("Time to iterate: " + (end - start));
assertFalse(it.hasNext());
}
finally {
releaseWalPointerForIterator(grp.shared(), ptr);
}
map = new IgniteDhtDemandedPartitionsMap();
map.addHistorical(1, i, entries, PARTS);
ptr = reserveWalPointerForIterator(grp.shared());
try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
assertNotNull(it);
assertTrue("Not historical for iteration: " + i, it.historical(1));
for (int j = i; j < entries; j++) {
assertTrue(it.hasNextX());
CacheDataRow row = it.next();
assertEquals(j * PARTS + 1, (int)row.key().value(grp.cacheObjectContext(), false));
assertEquals(j * PARTS + 1, (int)row.value().value(grp.cacheObjectContext(), false));
}
assertFalse(it.hasNext());
}
finally {
releaseWalPointerForIterator(grp.shared(), ptr);
}
}
}
finally {
stopAllGrids();
System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testWalAfterPreloading() throws Exception {
Ignite ignite = startGrid();
ignite.cluster().active(true);
try {
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
.cache().context().database();
dbMgr.enableCheckpoints(false).get();
int entries = 100;
try (IgniteDataStreamer<Integer, Integer> streamer = ignite.dataStreamer(CACHE_NAME)) {
for (int i = 0; i < entries; i++)
streamer.addData(i, i);
}
IgniteCache<Integer, Integer> cache = ignite.cache(CACHE_NAME);
for (int i = 0; i < entries; i++)
assertEquals(new Integer(i), cache.get(i));
stopGrid();
ignite = startGrid();
ignite.cluster().active(true);
cache = ignite.cache(CACHE_NAME);
for (int i = 0; i < entries; i++)
assertEquals(new Integer(i), cache.get(i));
}
finally {
stopAllGrids();
}
}
/**
* Reserves a WAL pointer for historical iterator.
*
* @param cctx Cache shared context.
* @return WAL pointer.
*/
private WALPointer reserveWalPointerForIterator(GridCacheSharedContext cctx) {
final CheckpointHistory cpHist = ((GridCacheDatabaseSharedManager)cctx.database()).checkpointHistory();
WALPointer oldestPtr = cpHist.firstCheckpointPointer();
AtomicReference<WALPointer> preloading = getFieldValue(cctx.database(), "reservedForPreloading");
preloading.set(oldestPtr);
cctx.wal().reserve(oldestPtr);
return oldestPtr;
}
/**
* Releases a WAL pointer for historical iterator.
*
* @param cctx Cache shared context.
* @param ptr WAL pointer to release.
*/
private void releaseWalPointerForIterator(GridCacheSharedContext cctx, WALPointer ptr) {
AtomicReference<WALPointer> preloading = getFieldValue(cctx.database(), "reservedForPreloading");
preloading.set(null);
cctx.wal().release(ptr);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRecoveryRandomPutRemove() throws Exception {
try {
pageSize = 1024;
extraCcfg = new CacheConfiguration(CACHE2_NAME);
extraCcfg.setAffinity(new RendezvousAffinityFunction(false, PARTS));
Ignite ignite = startGrid(0);
ignite.cluster().active(true);
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
.cache().context().database();
dbMgr.enableCheckpoints(false).get();
IgniteCache<Integer, IndexedValue> cache1 = ignite.cache(CACHE_NAME);
IgniteCache<Object, Object> cache2 = ignite.cache(CACHE2_NAME);
final int KEYS1 = 100;
for (int i = 0; i < KEYS1; i++)
cache1.put(i, new IndexedValue(i));
for (int i = 0; i < KEYS1; i++) {
if (i % 2 == 0)
cache1.remove(i);
}
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < KEYS1; i++) {
cache2.put(i, new byte[rnd.nextInt(512)]);
if (rnd.nextBoolean())
cache2.put(i, new byte[rnd.nextInt(512)]);
if (rnd.nextBoolean())
cache2.remove(i);
}
ignite.close();
ignite = startGrid(0);
ignite.cluster().active(true);
ignite.cache(CACHE_NAME).put(1, new IndexedValue(0));
}
finally {
stopAllGrids();
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testRecoveryNoPageLost1() throws Exception {
recoveryNoPageLost(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testRecoveryNoPageLost2() throws Exception {
recoveryNoPageLost(true);
}
/**
* Test checks that the number of pages per each page store are not changing before and after node restart.
*
* @throws Exception If failed.
*/
@Test
public void testRecoveryNoPageLost3() throws Exception {
try {
pageSize = 1024;
checkpointFreq = 100L;
extraCcfg = new CacheConfiguration(CACHE2_NAME);
extraCcfg.setAffinity(new RendezvousAffinityFunction(false, 32));
List<Integer> pages = null;
for (int iter = 0; iter < 5; iter++) {
log.info("Start node: " + iter);
Ignite ignite = startGrid(0);
ignite.cluster().active(true);
if (pages != null) {
List<Integer> curPags = allocatedPages(ignite, CACHE2_NAME);
assertEquals("Iter = " + iter, pages, curPags);
}
final IgniteCache<Integer, Object> cache = ignite.cache(CACHE2_NAME);
final int ops = ThreadLocalRandom.current().nextInt(10) + 10;
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < ops; i++) {
Integer key = rnd.nextInt(1000);
cache.put(key, new byte[rnd.nextInt(512)]);
if (rnd.nextBoolean())
cache.remove(key);
}
return null;
}
}, 10, "update");
pages = allocatedPages(ignite, CACHE2_NAME);
Ignition.stop(ignite.name(), false); //will make checkpoint
}
}
finally {
stopAllGrids();
}
}
/**
* @param checkpoint Checkpoint enable flag.
* @throws Exception If failed.
*/
private void recoveryNoPageLost(boolean checkpoint) throws Exception {
try {
pageSize = 1024;
extraCcfg = new CacheConfiguration(CACHE2_NAME);
extraCcfg.setAffinity(new RendezvousAffinityFunction(false, 32));
List<Integer> pages = null;
AtomicInteger cnt = new AtomicInteger();
for (int iter = 0; iter < 5; iter++) {
log.info("Start node: " + iter);
Ignite ignite = startGrid(0);
ignite.cluster().active(true);
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)((IgniteEx)ignite).context()
.cache().context().database();
if (!checkpoint)
dbMgr.enableCheckpoints(false).get();
if (pages != null) {
List<Integer> curPags = allocatedPages(ignite, CACHE2_NAME);
assertEquals(pages, curPags);
}
IgniteCache<Integer, Object> cache = ignite.cache(CACHE2_NAME);
for (int i = 0; i < 128; i++)
cache.put(cnt.incrementAndGet(), new byte[256 + iter * 100]);
pages = allocatedPages(ignite, CACHE2_NAME);
stopGrid(0, true);
}
}
finally {
stopAllGrids();
}
}
/**
* @param ignite Node.
* @param cacheName Cache name.
* @return Allocated pages per-store.
* @throws Exception If failed.
*/
private List<Integer> allocatedPages(Ignite ignite, String cacheName) throws Exception {
GridCacheProcessor cacheProc = ((IgniteEx)ignite).context().cache();
FilePageStoreManager storeMgr = (FilePageStoreManager)cacheProc.context().pageStore();
int parts = ignite.affinity(cacheName).partitions();
List<Integer> res = new ArrayList<>(parts);
for (int p = 0; p < parts; p++) {
PageStore store = storeMgr.getStore(CU.cacheId(cacheName), p);
cacheProc.context().database().checkpointReadLock();
try {
GridDhtLocalPartition part = cacheProc.cache(cacheName).context().topology().localPartition(p);
if (part.dataStore().rowStore() != null) {
AbstractFreeList freeList = (AbstractFreeList)part.dataStore().rowStore().freeList();
// Flush free-list onheap cache to page memory.
freeList.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
}
}
finally {
cacheProc.context().database().checkpointReadUnlock();
}
store.sync();
res.add(store.pages());
}
PageStore store = storeMgr.getStore(CU.cacheId(cacheName), PageIdAllocator.INDEX_PARTITION);
store.sync();
res.add(store.pages());
return res;
}
/**
* @throws Exception If failed.
*/
@Test
public void testFreeListRecovery() throws Exception {
try {
pageSize = 1024;
extraCcfg = new CacheConfiguration(CACHE2_NAME);
Ignite ignite = startGrid(0);
ignite.cluster().active(true);
IgniteCache<Integer, IndexedValue> cache1 = ignite.cache(CACHE_NAME);
IgniteCache<Object, Object> cache2 = ignite.cache(CACHE2_NAME);
final int KEYS1 = 2048;
for (int i = 0; i < KEYS1; i++)
cache1.put(i, new IndexedValue(i));
for (int i = 0; i < KEYS1; i++) {
if (i % 2 == 0)
cache1.remove(i);
}
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < KEYS1; i++) {
cache2.put(i, new byte[rnd.nextInt(512)]);
if (rnd.nextBoolean())
cache2.put(i, new byte[rnd.nextInt(512)]);
if (rnd.nextBoolean())
cache2.remove(i);
}
Map<Integer, T2<Map<Integer, long[]>, int[]>> cache1_1 = getFreeListData(ignite, CACHE_NAME);
Map<Integer, T2<Map<Integer, long[]>, int[]>> cache2_1 = getFreeListData(ignite, CACHE2_NAME);
T2<long[], Integer> rl1_1 = getReuseListData(ignite, CACHE_NAME);
T2<long[], Integer> rl2_1 = getReuseListData(ignite, CACHE2_NAME);
ignite.close();
ignite = startGrid(0);
ignite.cluster().active(true);
cache1 = ignite.cache(CACHE_NAME);
cache2 = ignite.cache(CACHE2_NAME);
for (int i = 0; i < KEYS1; i++) {
cache1.get(i);
cache2.get(i);
}
Map<Integer, T2<Map<Integer, long[]>, int[]>> cache1_2 = getFreeListData(ignite, CACHE_NAME);
Map<Integer, T2<Map<Integer, long[]>, int[]>> cache2_2 = getFreeListData(ignite, CACHE2_NAME);
T2<long[], Integer> rl1_2 = getReuseListData(ignite, CACHE_NAME);
T2<long[], Integer> rl2_2 = getReuseListData(ignite, CACHE2_NAME);
checkEquals(cache1_1, cache1_2);
checkEquals(cache2_1, cache2_2);
checkEquals(rl1_1, rl1_2);
checkEquals(rl2_1, rl2_2);
}
finally {
stopAllGrids();
}
}
/**
* Simple test for rollback record overlap count.
*/
@Test
public void testRollbackRecordOverlap() {
RollbackRecord r0 = new RollbackRecord(0, 0, 1, 1);
RollbackRecord r1 = new RollbackRecord(0, 0, 1, 4);
assertEquals(0, r0.overlap(0, 1));
assertEquals(1, r0.overlap(1, 2));
assertEquals(1, r0.overlap(0, 2));
assertEquals(0, r0.overlap(2, 3));
assertEquals(1, r0.overlap(1, 2));
assertEquals(0, r1.overlap(5, 6));
assertEquals(1, r1.overlap(4, 6));
assertEquals(0, r1.overlap(0, 1));
assertEquals(1, r1.overlap(2, 3));
assertEquals(2, r1.overlap(2, 4));
assertEquals(3, r1.overlap(2, 7));
assertEquals(1, r1.overlap(0, 2));
assertEquals(2, r1.overlap(0, 3));
assertEquals(3, r1.overlap(0, 4));
assertEquals(4, r1.overlap(0, 5));
assertEquals(4, r1.overlap(1, 5));
}
/**
* Tests if history iterator work correctly if partition contains missed due to rollback updates.
*/
@Test
public void testWalIteratorOverPartitionWithMissingEntries() throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, "0");
try {
Ignite ignite = startGrid();
ignite.cluster().active(true);
awaitPartitionMapExchange();
int totalKeys = 30;
final int part = 1;
List<Integer> keys = partitionKeys(ignite.cache(CACHE_NAME), part, totalKeys, 0);
ignite.cache(CACHE_NAME).put(keys.get(0), keys.get(0));
ignite.cache(CACHE_NAME).put(keys.get(1), keys.get(1));
int rolledBack = 0;
rolledBack += prepareTx(ignite, keys.subList(2, 6));
for (Integer key : keys.subList(6, 10))
ignite.cache(CACHE_NAME).put(key, key);
rolledBack += prepareTx(ignite, keys.subList(10, 14));
for (Integer key : keys.subList(14, 20))
ignite.cache(CACHE_NAME).put(key, key);
rolledBack += prepareTx(ignite, keys.subList(20, 25));
for (Integer key : keys.subList(25, 30))
ignite.cache(CACHE_NAME).put(key, key);
assertEquals(totalKeys - rolledBack, ignite.cache(CACHE_NAME).size());
// Expecting counters: 1-2, missed 3-6, 7-10, missed 11-14, 15-20, missed 21-25, 26-30
List<CacheDataRow> rows = rows(ignite, part, 0, 4);
assertEquals(2, rows.size());
assertEquals(keys.get(0), rows.get(0).key().value(null, false));
assertEquals(keys.get(1), rows.get(1).key().value(null, false));
rows = rows(ignite, part, 3, 4);
assertEquals(0, rows.size());
rows = rows(ignite, part, 4, 23);
assertEquals(10, rows.size());
int i = 0;
for (Integer key : keys.subList(6, 10))
assertEquals(key, rows.get(i++).key().value(null, false));
for (Integer key : keys.subList(14, 20))
assertEquals(key, rows.get(i++).key().value(null, false));
i = 0;
rows = rows(ignite, part, 16, 26);
assertEquals(5, rows.size());
for (Integer key : keys.subList(16, 20))
assertEquals(key, rows.get(i++).key().value(null, false));
assertEquals(keys.get(25), rows.get(i).key().value(null, false));
}
finally {
stopAllGrids();
System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
}
}
/**
* @param ignite Ignite.
* @param keys Keys.
*/
private int prepareTx(Ignite ignite, List<Integer> keys) throws IgniteCheckedException {
try (Transaction tx = ignite.transactions().txStart()) {
for (Integer key : keys)
ignite.cache(CACHE_NAME).put(key, key);
GridNearTxLocal tx0 = ((TransactionProxyImpl)tx).tx();
tx0.prepare(true);
tx0.rollback();
}
return keys.size();
}
/**
* @param ignite Ignite.
* @param part Partition.
* @param from From counter.
* @param to To counter.
*/
private List<CacheDataRow> rows(Ignite ignite, int part, long from, long to) throws IgniteCheckedException {
CacheGroupContext grp = ((IgniteEx)ignite).context().cache().cacheGroup(CU.cacheId(CACHE_NAME));
IgniteCacheOffheapManager offh = grp.offheap();
AffinityTopologyVersion topVer = grp.affinity().lastVersion();
IgniteDhtDemandedPartitionsMap map = new IgniteDhtDemandedPartitionsMap();
map.addHistorical(part, from, to, PARTS);
List<CacheDataRow> rows = new ArrayList<>();
WALPointer ptr = reserveWalPointerForIterator(grp.shared());
try (IgniteRebalanceIterator it = offh.rebalanceIterator(map, topVer)) {
assertNotNull(it);
while (it.hasNextX())
rows.add(it.next());
}
finally {
releaseWalPointerForIterator(grp.shared(), ptr);
}
return rows;
}
/**
* @param ignite Node.
* @param cacheName Cache name.
* @return Cache reuse list data.
*/
private T2<long[], Integer> getReuseListData(Ignite ignite, String cacheName) {
GridCacheContext ctx = ((IgniteEx)ignite).context().cache().cache(cacheName).context();
ReuseListImpl reuseList = getFieldValue(ctx.offheap(), "reuseList");
PagesList.Stripe[] bucket = getFieldValue(reuseList, "bucket");
long[] ids = null;
if (bucket != null) {
ids = new long[bucket.length];
for (int i = 0; i < bucket.length; i++)
ids[i] = bucket[i].tailId;
}
AtomicLongArray bucketsSize = getFieldValue(reuseList, PagesList.class, "bucketsSize");
assertEquals(1, bucketsSize.length());
return new T2<>(ids, (int)bucketsSize.get(0));
}
/**
* @param rl1 Data 1 (before stop).
* @param rl2 Data 2 (after restore).
*/
private void checkEquals(T2<long[], Integer> rl1, T2<long[], Integer> rl2) {
Assert.assertArrayEquals(rl1.get1(), rl2.get1());
assertEquals(rl1.get2(), rl2.get2());
}
/**
* @param partsLists1 Data 1 (before stop).
* @param partsLists2 Data 2 (after restore).
*/
private void checkEquals(Map<Integer, T2<Map<Integer, long[]>, int[]>> partsLists1,
Map<Integer, T2<Map<Integer, long[]>, int[]>> partsLists2) {
assertEquals(partsLists1.size(), partsLists2.size());
for (Integer part : partsLists1.keySet()) {
T2<Map<Integer, long[]>, int[]> t1 = partsLists1.get(part);
T2<Map<Integer, long[]>, int[]> t2 = partsLists2.get(part);
Map<Integer, long[]> m1 = t1.get1();
Map<Integer, long[]> m2 = t2.get1();
assertEquals(m1.size(), m2.size());
for (Integer bucket : m1.keySet()) {
long tails1[] = m1.get(bucket);
long tails2[] = m2.get(bucket);
Assert.assertArrayEquals(tails1, tails2);
}
Assert.assertArrayEquals("Wrong counts [part=" + part + ']', t1.get2(), t2.get2());
}
}
/**
* @param ignite Node.
* @param cacheName Cache name.
* @return Cache free lists data (partition number to map of buckets to tails and buckets size).
*/
private Map<Integer, T2<Map<Integer, long[]>, int[]>> getFreeListData(Ignite ignite, String cacheName) throws IgniteCheckedException {
GridCacheProcessor cacheProc = ((IgniteEx)ignite).context().cache();
GridCacheContext ctx = cacheProc.cache(cacheName).context();
List<GridDhtLocalPartition> parts = ctx.topology().localPartitions();
assertTrue(!parts.isEmpty());
assertEquals(ctx.affinity().partitions(), parts.size());
Map<Integer, T2<Map<Integer, long[]>, int[]>> res = new HashMap<>();
boolean foundNonEmpty = false;
boolean foundTails = false;
cacheProc.context().database().checkpointReadLock();
try {
for (GridDhtLocalPartition part : parts) {
AbstractFreeList freeList = (AbstractFreeList)part.dataStore().rowStore().freeList();
if (freeList == null)
// Lazy store.
continue;
// Flush free-list onheap cache to page memory.
freeList.saveMetadata(IoStatisticsHolderNoOp.INSTANCE);
AtomicReferenceArray<PagesList.Stripe[]> buckets = getFieldValue(freeList,
AbstractFreeList.class, "buckets");
AtomicLongArray bucketsSize = getFieldValue(freeList, PagesList.class, "bucketsSize");
assertNotNull(buckets);
assertNotNull(bucketsSize);
assertTrue(buckets.length() > 0);
assertEquals(bucketsSize.length(), buckets.length());
Map<Integer, long[]> tailsPerBucket = new HashMap<>();
for (int i = 0; i < buckets.length(); i++) {
PagesList.Stripe[] tails = buckets.get(i);
long ids[] = null;
if (tails != null) {
ids = new long[tails.length];
for (int j = 0; j < tails.length; j++)
ids[j] = tails[j].tailId;
}
tailsPerBucket.put(i, ids);
if (tails != null) {
assertTrue(tails.length > 0);
foundTails = true;
}
}
int[] cntsPerBucket = new int[bucketsSize.length()];
for (int i = 0; i < bucketsSize.length(); i++) {
cntsPerBucket[i] = (int)bucketsSize.get(i);
if (cntsPerBucket[i] > 0)
foundNonEmpty = true;
}
res.put(part.id(), new T2<>(tailsPerBucket, cntsPerBucket));
}
}
finally {
cacheProc.context().database().checkpointReadUnlock();
}
assertTrue(foundNonEmpty);
assertTrue(foundTails);
return res;
}
/**
*
*/
private static class IndexedValue {
/** */
@QuerySqlField(index = true)
private int iVal;
/** */
@QuerySqlField
private String sVal;
/**
* @param iVal Indexed value.
*/
private IndexedValue(int iVal) {
this.iVal = iVal;
sVal = "string-" + iVal;
}
/**
* @return Value.
*/
private int value() {
return iVal;
}
}
}