blob: 55b6b616434ce14ab1420c19211a0789900a9e61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.io.File;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
/**
*
*/
public class IgnitePdsSporadicDataRecordsOnBackupTest extends GridCommonAbstractTest {
/** */
private static final int GRID_CNT = 2;
/** Transactional cache name. */
protected static final String TX_CACHE_NAME = "txCache";
/** Keys count. */
protected static final int KEYS_CNT = 100;
/** Stop tx load flag. */
protected static final AtomicBoolean txStop = new AtomicBoolean();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
DataStorageConfiguration memCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration().setMaxSize(10L * 1024 * 1024).setPersistenceEnabled(true))
.setWalMode(WALMode.LOG_ONLY);
cfg.setDataStorageConfiguration(memCfg);
CacheConfiguration<Integer, Long> txCacheCfg = new CacheConfiguration<Integer, Long>(TX_CACHE_NAME)
.setBackups(1)
.setAtomicityMode(TRANSACTIONAL)
.setAffinity(new RendezvousAffinityFunction(false, 32))
.setWriteSynchronizationMode(FULL_SYNC)
.setQueryEntities(Collections.singletonList(new QueryEntity(Integer.class, Long.class)));
cfg.setCacheConfiguration(txCacheCfg);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
cleanPersistenceDir();
super.beforeTest();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
cleanPersistenceDir();
}
/**
* @throws Exception If failed.
*/
@Test
public void testSporadicDataRecordsOnBackup() throws Exception {
IgniteEx ig0 = startGrid(0);
IgniteEx ig1 = startGrid(1);
grid(0).cluster().active(true);
String nodeFolderName0 = ig0.context().pdsFolderResolver().resolveFolders().folderName();
String nodeFolderName1 = ig1.context().pdsFolderResolver().resolveFolders().folderName();
IgniteCache<Integer, Long> cache = grid(0).cache(TX_CACHE_NAME);
for (int i = 0; i < KEYS_CNT; ++i)
cache.put(i, (long)i);
IgniteInternalFuture txLoadFut = startTxLoad(5, null);
doSleep(10_000);
txStop.set(true);
txLoadFut.get();
grid(0).cluster().active(false);
stopAllGrids();
assertEquals(0, findSporadicDataRecords(nodeFolderName0) + findSporadicDataRecords(nodeFolderName1));
}
/**
* Returns a number of {@link DataRecord} that do not have a xid version,
* in other words, number of records that were created by GridCacheUtils#createBackupPostProcessingClosure().
*
* @throws IgniteCheckedException If failed.
*/
private long findSporadicDataRecords(String nodeFolderName) throws IgniteCheckedException {
File dbDir = new File(U.defaultWorkDirectory(), "db");
File commonWalDir = new File(dbDir, "wal");
File walDir = new File(commonWalDir, nodeFolderName);
File walArchiveDir = new File(new File(commonWalDir, "archive"), nodeFolderName);
assertTrue(walDir.exists());
assertTrue(walArchiveDir.exists());
IteratorParametersBuilder params = new IteratorParametersBuilder();
params.bufferSize(1024 * 1024);
params.filesOrDirs(walDir, walArchiveDir);
params.filter((type, pointer) -> type == WALRecord.RecordType.DATA_RECORD_V2);
int cacheId = CU.cacheId(TX_CACHE_NAME);
long createOpCnt = 0;
try (WALIterator itr = new IgniteWalIteratorFactory().iterator(params)) {
while (itr.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> walEntry = itr.next();
assertTrue(walEntry.get2() instanceof DataRecord);
DataRecord rec = (DataRecord)walEntry.get2();
createOpCnt += rec.writeEntries()
.stream()
.filter(e ->
e.cacheId() == cacheId && GridCacheOperation.CREATE == e.op() && e.nearXidVersion() == null)
.count();
}
}
return createOpCnt;
}
/**
* @param threads Threads.
* @param ignite Load source instance.
*/
@SuppressWarnings({"SameParameterValue"})
protected IgniteInternalFuture startTxLoad(int threads, Ignite ignite) {
txStop.set(false);
return GridTestUtils.runMultiThreadedAsync(() -> {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (!txStop.get()) {
Ignite ig = ignite == null ? grid(rnd.nextInt(GRID_CNT)) : ignite;
if (ig == null)
continue;
IgniteCache<Integer, Long> cache = ig.cache(TX_CACHE_NAME);
try (Transaction tx = ig.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) {
int acc0 = rnd.nextInt(KEYS_CNT);
int acc1 = rnd.nextInt(KEYS_CNT);
while (acc1 == acc0)
acc1 = rnd.nextInt(KEYS_CNT);
// Avoid deadlocks.
if (acc0 > acc1) {
int tmp = acc0;
acc0 = acc1;
acc1 = tmp;
}
long val0 = cache.get(acc0);
long val1 = cache.get(acc1);
long delta = rnd.nextLong(Math.max(val0, val1));
if (val0 < val1) {
cache.put(acc0, val0 + delta);
cache.put(acc1, val1 - delta);
}
else {
cache.put(acc0, val0 - delta);
cache.put(acc1, val1 + delta);
}
tx.commit();
}
catch (Throwable e) {
assertFalse(e instanceof NullPointerException);
}
}
},
threads, "tx-load-thread"
);
}
}