blob: 64a521dc753c7eed2c093f52e4d5f1ba25127f59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.db.wal.reader;
import java.io.Externalizable;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.events.WalSegmentArchivedEvent;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MarshalledDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.TxRecord;
import org.apache.ignite.internal.pagemem.wal.record.UnwrapDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.UnwrappedDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.logger.NullLogger;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import static java.util.Arrays.fill;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MVCC_DATA_RECORD;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver.genNewStyleSubfolderName;
/**
* Test suite for WAL segments reader and event generator.
*/
public class IgniteWalReaderTest extends GridCommonAbstractTest {
/** Wal segments count */
private static final int WAL_SEGMENTS = 10;
/** Cache name. */
private static final String CACHE_NAME = "cache0";
/** additional cache for testing different combinations of types in WAL. */
private static final String CACHE_ADDL_NAME = "cache1";
/** Dump records to logger. Should be false for non local run. */
private static final boolean DUMP_RECORDS = true;
/**
* Field for transferring setting from test to getConfig method.
* Archive incomplete segment after inactivity milliseconds.
*/
private int archiveIncompleteSegmentAfterInactivityMs;
/** Force archive timeout in milliseconds. */
private int forceArchiveSegmentMs;
/** Custom wal mode. */
private WALMode customWalMode;
/** Set WAL and Archive path to same value. */
private boolean setWalAndArchiveToSameVal;
/** Whether to enable WAL archive compaction. */
private boolean enableWalCompaction;
/** Backup count. */
private int backupCnt;
/** DataEntry from primary flag. */
private boolean primary = true;
/** DataEntry during rebalacne flag. */
private boolean rebalance;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
CacheConfiguration<Integer, IndexedObject> ccfg = new CacheConfiguration<>(CACHE_NAME);
ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
ccfg.setIndexedTypes(Integer.class, IndexedObject.class);
ccfg.setBackups(backupCnt);
cfg.setCacheConfiguration(ccfg);
cfg.setIncludeEventTypes(EVT_WAL_SEGMENT_ARCHIVED, EVT_WAL_SEGMENT_COMPACTED);
DataStorageConfiguration dsCfg = new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(
new DataRegionConfiguration()
.setMaxSize(1024L * 1024 * 1024)
.setPersistenceEnabled(true))
.setWalSegmentSize(1024 * 1024)
.setWalSegments(WAL_SEGMENTS)
.setWalMode(customWalMode != null ? customWalMode : WALMode.BACKGROUND)
.setWalCompactionEnabled(enableWalCompaction);
if (archiveIncompleteSegmentAfterInactivityMs > 0)
dsCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs);
if (forceArchiveSegmentMs > 0)
dsCfg.setWalForceArchiveTimeout(forceArchiveSegmentMs);
String workDir = U.defaultWorkDirectory();
File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false);
File wal = new File(db, "wal");
if (setWalAndArchiveToSameVal) {
String walAbsPath = wal.getAbsolutePath();
dsCfg.setWalPath(walAbsPath);
dsCfg.setWalArchivePath(walAbsPath);
}
else {
dsCfg.setWalPath(wal.getAbsolutePath());
dsCfg.setWalArchivePath(new File(wal, "archive").getAbsolutePath());
}
cfg.setDataStorageConfiguration(dsCfg);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
}
/**
* @throws Exception if failed.
*/
@Test
public void testFillWalAndReadRecords() throws Exception {
setWalAndArchiveToSameVal = false;
Ignite ignite0 = startGrid();
ignite0.cluster().active(true);
Serializable consistentId = (Serializable)ignite0.cluster().localNode().consistentId();
String subfolderName = genNewStyleSubfolderName(0, (UUID)consistentId);
int cacheObjectsToWrite = 10_000;
putDummyRecords(ignite0, cacheObjectsToWrite);
stopGrid();
String workDir = U.defaultWorkDirectory();
File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false);
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
IteratorParametersBuilder params =
createIteratorParametersBuilder(workDir, subfolderName)
.filesOrDirs(db);
// Check iteratorArchiveDirectory and iteratorArchiveFiles are same.
int cntArchiveDir = iterateAndCount(factory.iterator(params));
log.info("Total records loaded using directory : " + cntArchiveDir);
assertTrue(cntArchiveDir > 0);
// Check iteratorArchiveFiles + iteratorWorkFiles iterate over all entries.
int[] checkKeyIterArr = new int[cacheObjectsToWrite];
fill(checkKeyIterArr, 0);
iterateAndCountDataRecord(
factory.iterator(params),
(o1, o2) -> checkKeyIterArr[(Integer)o1]++,
null
);
for (int i = 0; i < cacheObjectsToWrite; i++)
assertTrue("Iterator didn't find key=" + i, checkKeyIterArr[i] > 0);
}
/**
* Iterates on records and closes iterator.
*
* @param walIter iterator to count, will be closed.
* @return count of records.
* @throws IgniteCheckedException if failed to iterate.
*/
private int iterateAndCount(WALIterator walIter) throws IgniteCheckedException {
int cnt = 0;
try (WALIterator it = walIter) {
while (it.hasNextX()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
WALRecord walRecord = tup.get2();
if (walRecord.type() == DATA_RECORD_V2 || walRecord.type() == MVCC_DATA_RECORD) {
DataRecord record = (DataRecord)walRecord;
for (DataEntry entry : record.writeEntries()) {
KeyCacheObject key = entry.key();
CacheObject val = entry.value();
if (walRecord.type() == DATA_RECORD_V2) {
assertEquals(primary, (entry.flags() & DataEntry.PRIMARY_FLAG) != 0);
assertEquals(rebalance, (entry.flags() & DataEntry.PRELOAD_FLAG) != 0);
}
if (DUMP_RECORDS)
log.info("Op: " + entry.op() + ", Key: " + key + ", Value: " + val);
}
}
if (DUMP_RECORDS)
log.info("Record: " + walRecord);
cnt++;
}
}
return cnt;
}
/**
* Tests archive completed event is fired.
*
* @throws Exception if failed.
*/
@Test
public void testArchiveCompletedEventFired() throws Exception {
assertTrue(checkWhetherWALRelatedEventFired(EVT_WAL_SEGMENT_ARCHIVED));
}
/**
* Tests archive completed event is fired.
*
* @throws Exception if failed.
*/
@Test
public void testArchiveCompactedEventFired() throws Exception {
boolean oldEnableWalCompaction = enableWalCompaction;
try {
enableWalCompaction = true;
assertTrue(checkWhetherWALRelatedEventFired(EVT_WAL_SEGMENT_COMPACTED));
}
finally {
enableWalCompaction = oldEnableWalCompaction;
}
}
/** */
private boolean checkWhetherWALRelatedEventFired(int evtType) throws Exception {
AtomicBoolean evtRecorded = new AtomicBoolean();
Ignite ignite = startGrid();
ignite.cluster().active(true);
final IgniteEvents evts = ignite.events();
if (!evts.isEnabled(evtType))
fail("nothing to test");
evts.localListen(e -> {
WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
long idx = archComplEvt.getAbsWalSegmentIdx();
log.info("Finished for segment [" +
idx + ", " + archComplEvt.getArchiveFile() + "]: [" + e + "]");
evtRecorded.set(true);
return true;
}, evtType);
putDummyRecords(ignite, 5_000);
stopGrid();
return evtRecorded.get();
}
/**
* Tests force time out based WAL segment archiving.
*
* @throws Exception if failure occurs.
*/
@Test
public void testForceArchiveSegment() throws Exception {
AtomicBoolean waitingForEvt = new AtomicBoolean();
CountDownLatch forceArchiveSegment = new CountDownLatch(1);
forceArchiveSegmentMs = 1000;
Ignite ignite = startGrid();
ignite.cluster().state(ACTIVE);
IgniteEvents evts = ignite.events();
evts.localListen(e -> {
if (waitingForEvt.get())
forceArchiveSegment.countDown();
return true;
}, EVT_WAL_SEGMENT_ARCHIVED);
putDummyRecords(ignite, 100);
waitingForEvt.set(true); // Flag for skipping regular log() and rollOver().
putDummyRecords(ignite, 1);
boolean recordedAfterSleep = forceArchiveSegment.await(forceArchiveSegmentMs + getTestTimeout(), TimeUnit.MILLISECONDS);
stopGrid();
assertTrue(recordedAfterSleep);
}
/**
* Tests time out based WAL segment archiving.
*
* @throws Exception if failure occurs.
*/
@Test
public void testArchiveIncompleteSegmentAfterInactivity() throws Exception {
AtomicBoolean waitingForEvt = new AtomicBoolean();
CountDownLatch archiveSegmentForInactivity = new CountDownLatch(1);
archiveIncompleteSegmentAfterInactivityMs = 1000;
Ignite ignite = startGrid();
ignite.cluster().active(true);
IgniteEvents evts = ignite.events();
evts.localListen(e -> {
WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
long idx = archComplEvt.getAbsWalSegmentIdx();
log.info("Finished archive for segment [" + idx + ", " +
archComplEvt.getArchiveFile() + "]: [" + e + ']');
if (waitingForEvt.get())
archiveSegmentForInactivity.countDown();
return true;
}, EVT_WAL_SEGMENT_ARCHIVED);
putDummyRecords(ignite, 100);
waitingForEvt.set(true); // Flag for skipping regular log() and rollOver().
log.info("Wait for archiving segment for inactive grid started");
boolean recordedAfterSleep = archiveSegmentForInactivity.await(
archiveIncompleteSegmentAfterInactivityMs + 1001, TimeUnit.MILLISECONDS);
stopGrid();
assertTrue(recordedAfterSleep);
}
/**
* Tests archive completed event is fired.
*
* @throws Exception if failed.
*/
@Test
public void testFillWalForExactSegmentsCount() throws Exception {
customWalMode = WALMode.FSYNC;
CountDownLatch reqSegments = new CountDownLatch(15);
Ignite ignite = startGrid();
ignite.cluster().active(true);
final IgniteEvents evts = ignite.events();
if (!evts.isEnabled(EVT_WAL_SEGMENT_ARCHIVED))
fail("nothing to test");
evts.localListen(e -> {
WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
long idx = archComplEvt.getAbsWalSegmentIdx();
log.info("Finished archive for segment [" + idx + ", " +
archComplEvt.getArchiveFile() + "]: [" + e + "]");
reqSegments.countDown();
return true;
}, EVT_WAL_SEGMENT_ARCHIVED);
int totalEntries = 0;
while (reqSegments.getCount() > 0) {
int write = 500;
putAllDummyRecords(ignite, write);
totalEntries += write;
Assert.assertTrue("Too much entries generated, but segments was not become available",
totalEntries < 10000);
}
String subfolderName = genDbSubfolderName(ignite, 0);
stopGrid();
String workDir = U.defaultWorkDirectory();
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
IteratorParametersBuilder iterParametersBuilder = createIteratorParametersBuilder(workDir, subfolderName);
iterParametersBuilder.filesOrDirs(workDir);
scanIterateAndCount(
factory,
iterParametersBuilder,
totalEntries,
0,
null,
null
);
}
/**
* Removes entry by key and value from map (java 8 map method copy).
*
* @param m map to remove from.
* @param key key to remove.
* @param val value to remove.
* @return true if remove was successful.
*/
private boolean remove(Map m, Object key, Object val) {
Object curVal = m.get(key);
if (!Objects.equals(curVal, val) ||
(curVal == null && !m.containsKey(key)))
return false;
m.remove(key);
return true;
}
/**
* Places records under transaction, checks its value using WAL.
*
* @throws Exception if failed.
*/
@Test
public void testTxFillWalAndExtractDataRecords() throws Exception {
Ignite ignite0 = startGrid();
ignite0.cluster().active(true);
int cntEntries = 1000;
int txCnt = 100;
IgniteCache<Object, Object> entries = txPutDummyRecords(ignite0, cntEntries, txCnt);
Map<Object, Object> ctrlMap = new HashMap<>();
for (Cache.Entry<Object, Object> next : entries)
ctrlMap.put(next.getKey(), next.getValue());
String subfolderName = genDbSubfolderName(ignite0, 0);
stopGrid();
String workDir = U.defaultWorkDirectory();
IteratorParametersBuilder params = createIteratorParametersBuilder(workDir, subfolderName);
params.filesOrDirs(workDir);
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
IgniteBiInClosure<Object, Object> objConsumer = (key, val) -> {
boolean rmv = remove(ctrlMap, key, val);
if (!rmv)
log.error("Unable to remove Key and value from control Map K:[" + key + "] V: [" + val + "]");
if (val instanceof IndexedObject) {
IndexedObject indexedObj = (IndexedObject)val;
assertEquals(indexedObj.iVal, indexedObj.jVal);
assertEquals(indexedObj.iVal, key);
for (byte datum : indexedObj.getData())
assertTrue(datum >= 'A' && datum <= 'A' + 10);
}
};
scanIterateAndCount(factory, params, cntEntries, txCnt, objConsumer, null);
assertTrue(" Control Map is not empty after reading entries " + ctrlMap, ctrlMap.isEmpty());
}
/**
* Generates DB subfolder name for provided node index (local) and UUID (consistent ID).
*
* @param ignite ignite instance.
* @param nodeIdx node index.
* @return folder file name.
*/
@NotNull private String genDbSubfolderName(Ignite ignite, int nodeIdx) {
return genNewStyleSubfolderName(nodeIdx, (UUID)ignite.cluster().localNode().consistentId());
}
/**
* Scan WAL and WAL archive for logical records and its entries.
*
* @param factory WAL iterator factory.
* @param minCntEntries minimum expected entries count to find.
* @param minTxCnt minimum expected transaction count to find.
* @param objConsumer object handler, called for each object found in logical data records.
* @param dataRecordHnd data handler record.
* @throws IgniteCheckedException if failed.
*/
private void scanIterateAndCount(
IgniteWalIteratorFactory factory,
IteratorParametersBuilder itParamBuilder,
int minCntEntries,
int minTxCnt,
@Nullable IgniteBiInClosure<Object, Object> objConsumer,
@Nullable IgniteInClosure<DataRecord> dataRecordHnd
) throws IgniteCheckedException {
WALIterator iter = factory.iterator(itParamBuilder);
Map<GridCacheVersion, Integer> cntArch = iterateAndCountDataRecord(iter, objConsumer, dataRecordHnd);
int txCntObservedArch = cntArch.size();
if (cntArch.containsKey(null))
txCntObservedArch -= 1; // Exclude non transactional updates.
int entries = valuesSum(cntArch.values());
log.info("Total tx found loaded using archive directory (file-by-file): " + txCntObservedArch);
assertTrue("txCntObservedArch=" + txCntObservedArch + " >= minTxCnt=" + minTxCnt,
txCntObservedArch >= minTxCnt);
assertTrue("entries=" + entries + " >= minCntEntries=" + minCntEntries,
entries >= minCntEntries);
}
/**
* @throws Exception if failed.
*/
@Test
public void testFillWalWithDifferentTypes() throws Exception {
Ignite ig = startGrid();
ig.cluster().active(true);
IgniteCache<Object, Object> addlCache = ig.getOrCreateCache(CACHE_ADDL_NAME);
addlCache.put("1", "2");
addlCache.put(1, 2);
addlCache.put(1L, 2L);
addlCache.put(TestEnum.A, "Enum_As_Key");
addlCache.put("Enum_As_Value", TestEnum.B);
addlCache.put(TestEnum.C, TestEnum.C);
addlCache.put("Serializable", new TestSerializable(42));
addlCache.put(new TestSerializable(42), "Serializable_As_Key");
addlCache.put("Externalizable", new TestExternalizable(42));
addlCache.put(new TestExternalizable(42), "Externalizable_As_Key");
addlCache.put(292, new IndexedObject(292));
String search1 = "SomeUnexpectedStringValueAsKeyToSearch";
Collection<String> ctrlStringsToSearch = new HashSet<>();
ctrlStringsToSearch.add(search1);
Collection<String> ctrlStringsForBinaryObjSearch = new HashSet<>();
ctrlStringsForBinaryObjSearch.add(search1);
addlCache.put(search1, "SearchKey");
String search2 = "SomeTestStringContainerToBePrintedLongLine";
TestStringContainerToBePrinted val = new TestStringContainerToBePrinted(search2);
//will validate original toString() was called
ctrlStringsToSearch.add("v = [ " + val.getClass().getSimpleName() + "{data='" + search2 + "'}]");
ctrlStringsForBinaryObjSearch.add(search2);
addlCache.put("SearchValue", val);
String search3 = "SomeTestStringContainerToBePrintedLongLine2";
TestStringContainerToBePrinted key = new TestStringContainerToBePrinted(search3);
//will validate original toString() was called
ctrlStringsToSearch.add("k = " + key.getClass().getSimpleName() + "{data='" + search3 + "'}");
ctrlStringsForBinaryObjSearch.add(search3); //validate only string itself
addlCache.put(key, "SearchKey");
int cntEntries = addlCache.size();
Map<Object, Object> ctrlMap = new HashMap<>();
for (Cache.Entry<Object, Object> next : addlCache)
ctrlMap.put(next.getKey(), next.getValue());
Map<Object, Object> ctrlMapForBinaryObjects = new HashMap<>();
for (Cache.Entry<Object, Object> next : addlCache)
ctrlMapForBinaryObjects.put(next.getKey(), next.getValue());
String subfolderName = genDbSubfolderName(ig, 0);
// Wait async allocation wal segment file by archiver.
Thread.sleep(1000);
stopGrid("node0", false);
String workDir = U.defaultWorkDirectory();
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
IteratorParametersBuilder params0 = createIteratorParametersBuilder(workDir, subfolderName);
params0.filesOrDirs(workDir);
IgniteBiInClosure<Object, Object> objConsumer = (key12, val1) -> {
log.info("K: [" + key12 + ", " +
(key12 != null ? key12.getClass().getName() : "?") + "]" +
" V: [" + val1 + ", " +
(val1 != null ? val1.getClass().getName() : "?") + "]");
boolean rmv = remove(ctrlMap, key12, val1);
if (!rmv) {
String msg = "Unable to remove pair from control map " + "K: [" + key12 + "] V: [" + val1 + "]";
log.error(msg);
}
assertFalse(val1 instanceof BinaryObject);
};
IgniteInClosure<DataRecord> toStrChecker = record -> {
String strRepresentation = record.toString();
for (Iterator<String> iter = ctrlStringsToSearch.iterator(); iter.hasNext(); ) {
final String next = iter.next();
if (strRepresentation.contains(next)) {
iter.remove();
break;
}
}
};
scanIterateAndCount(factory, params0, cntEntries, 0, objConsumer, toStrChecker);
assertTrue(" Control Map is not empty after reading entries: " + ctrlMap, ctrlMap.isEmpty());
assertTrue(" Control Map for strings in entries is not empty after" +
" reading records: " + ctrlStringsToSearch, ctrlStringsToSearch.isEmpty());
IgniteBiInClosure<Object, Object> binObjConsumer = (key13, val12) -> {
log.info("K(KeepBinary): [" + key13 + ", " +
(key13 != null ? key13.getClass().getName() : "?") + "]" +
" V(KeepBinary): [" + val12 + ", " +
(val12 != null ? val12.getClass().getName() : "?") + "]");
boolean rmv = remove(ctrlMapForBinaryObjects, key13, val12);
if (!rmv) {
if (key13 instanceof BinaryObject) {
BinaryObject keyBinObj = (BinaryObject)key13;
String binaryObjTypeName = keyBinObj.type().typeName();
if (Objects.equals(TestStringContainerToBePrinted.class.getName(), binaryObjTypeName)) {
String data = keyBinObj.field("data");
rmv = ctrlMapForBinaryObjects.remove(new TestStringContainerToBePrinted(data)) != null;
}
else if (Objects.equals(TestSerializable.class.getName(), binaryObjTypeName)) {
Integer iVal = keyBinObj.field("iVal");
rmv = ctrlMapForBinaryObjects.remove(new TestSerializable(iVal)) != null;
}
else if (Objects.equals(TestEnum.class.getName(), binaryObjTypeName)) {
TestEnum key1 = TestEnum.values()[keyBinObj.enumOrdinal()];
rmv = ctrlMapForBinaryObjects.remove(key1) != null;
}
}
else if (val12 instanceof BinaryObject) {
//don't compare BO values, just remove by key
rmv = ctrlMapForBinaryObjects.remove(key13) != null;
}
}
if (!rmv)
log.error("Unable to remove pair from control map " + "K: [" + key13 + "] V: [" + val12 + "]");
if (val12 instanceof BinaryObject) {
BinaryObject binaryObj = (BinaryObject)val12;
String binaryObjTypeName = binaryObj.type().typeName();
if (Objects.equals(IndexedObject.class.getName(), binaryObjTypeName)) {
assertEquals(
binaryObj.field("iVal").toString(),
binaryObj.field("jVal").toString()
);
byte data[] = binaryObj.field("data");
for (byte datum : data)
assertTrue(datum >= 'A' && datum <= 'A' + 10);
}
}
};
IgniteInClosure<DataRecord> binObjToStrChecker = record -> {
String strRepresentation = record.toString();
for (Iterator<String> iter = ctrlStringsForBinaryObjSearch.iterator(); iter.hasNext(); ) {
final String next = iter.next();
if (strRepresentation.contains(next)) {
iter.remove();
break;
}
}
};
IteratorParametersBuilder params1 = createIteratorParametersBuilder(workDir, subfolderName);
params1.filesOrDirs(workDir).keepBinary(true);
//Validate same WAL log with flag binary objects only
IgniteWalIteratorFactory keepBinFactory = new IgniteWalIteratorFactory(log);
scanIterateAndCount(keepBinFactory, params1, cntEntries, 0, binObjConsumer, binObjToStrChecker);
assertTrue(" Control Map is not empty after reading entries: " +
ctrlMapForBinaryObjects, ctrlMapForBinaryObjects.isEmpty());
assertTrue(" Control Map for strings in entries is not empty after" +
" reading records: " + ctrlStringsForBinaryObjSearch, ctrlStringsForBinaryObjSearch.isEmpty());
}
/**
* Tests reading of empty WAL from non filled cluster.
*
* @throws Exception if failed.
*/
@Test
public void testReadEmptyWal() throws Exception {
customWalMode = WALMode.FSYNC;
Ignite ignite = startGrid();
ignite.cluster().active(true);
ignite.cluster().active(false);
final String subfolderName = genDbSubfolderName(ignite, 0);
stopGrid();
String workDir = U.defaultWorkDirectory();
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
IteratorParametersBuilder iterParametersBuilder =
createIteratorParametersBuilder(workDir, subfolderName)
.filesOrDirs(workDir);
scanIterateAndCount(
factory,
iterParametersBuilder,
0,
0,
null,
null
);
}
/**
* Tests WAL iterator which uses shared cache context of currently started Ignite node.
*/
@Test
public void testIteratorWithCurrentKernelContext() throws Exception {
IgniteEx ignite = startGrid(0);
ignite.cluster().active(true);
int cntEntries = 100;
putDummyRecords(ignite, cntEntries);
String workDir = U.defaultWorkDirectory();
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
IteratorParametersBuilder iterParametersBuilder =
createIteratorParametersBuilder(workDir, genDbSubfolderName(ignite, 0))
.filesOrDirs(workDir)
.binaryMetadataFileStoreDir(null)
.marshallerMappingFileStoreDir(null)
.sharedContext(ignite.context().cache().context());
AtomicInteger cnt = new AtomicInteger();
IgniteBiInClosure<Object, Object> objConsumer = (key, val) -> {
if (val instanceof IndexedObject) {
assertEquals(key, ((IndexedObject)val).iVal);
assertEquals(key, cnt.getAndIncrement());
}
};
iterateAndCountDataRecord(factory.iterator(iterParametersBuilder.copy()), objConsumer, null);
assertEquals(cntEntries, cnt.get());
// Test without converting non primary types.
iterParametersBuilder.keepBinary(true);
cnt.set(0);
IgniteBiInClosure<Object, Object> binObjConsumer = (key, val) -> {
if (val instanceof BinaryObject) {
assertEquals(key, ((BinaryObject)val).field("iVal"));
assertEquals(key, cnt.getAndIncrement());
}
};
iterateAndCountDataRecord(factory.iterator(iterParametersBuilder.copy()), binObjConsumer, null);
assertEquals(cntEntries, cnt.get());
}
/**
* Creates and fills cache with data.
*
* @param ig Ignite instance.
* @param mode Cache Atomicity Mode.
*/
private void createCache2(Ignite ig, CacheAtomicityMode mode) {
if (log.isInfoEnabled())
log.info("Populating the cache...");
final CacheConfiguration<Integer, Organization> cfg = new CacheConfiguration<>("Org" + "11");
cfg.setAtomicityMode(mode);
final IgniteCache<Integer, Organization> cache = ig.getOrCreateCache(cfg).withKeepBinary()
.withAllowAtomicOpsInTx();
try (Transaction tx = ig.transactions().txStart()) {
for (int i = 0; i < 10; i++) {
cache.put(i, new Organization(i, "Organization-" + i));
if (i % 2 == 0)
cache.put(i, new Organization(i, "Organization-updated-" + i));
if (i % 5 == 0)
cache.remove(i);
}
tx.commit();
}
}
/**
* Test if DELETE operation can be found for transactional cache after mixed cache operations including remove().
*
* @throws Exception if failed.
*/
@Test
public void testRemoveOperationPresentedForDataEntry() throws Exception {
runRemoveOperationTest(CacheAtomicityMode.TRANSACTIONAL);
}
/**
* Test if DELETE operation can be found for atomic cache after mixed cache operations including remove().
*
* @throws Exception if failed.
*/
@Test
public void testRemoveOperationPresentedForDataEntryForAtomic() throws Exception {
Assume.assumeFalse(MvccFeatureChecker.forcedMvcc());
runRemoveOperationTest(CacheAtomicityMode.ATOMIC);
}
/**
* Test if DELETE operation can be found after mixed cache operations including remove().
*
* @param mode Cache Atomicity Mode.
* @throws Exception if failed.
*/
private void runRemoveOperationTest(CacheAtomicityMode mode) throws Exception {
Ignite ignite = startGrid();
ignite.cluster().active(true);
createCache2(ignite, mode);
ignite.cluster().active(false);
String subfolderName = genDbSubfolderName(ignite, 0);
stopGrid();
String workDir = U.defaultWorkDirectory();
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
IteratorParametersBuilder params = createIteratorParametersBuilder(workDir, subfolderName);
params.filesOrDirs(workDir);
StringBuilder sb = new StringBuilder();
Map<GridCacheOperation, Integer> operationsFound = new EnumMap<>(GridCacheOperation.class);
scanIterateAndCount(
factory,
params,
0,
0,
null,
dataRecord -> {
final List<DataEntry> entries = dataRecord.writeEntries();
sb.append("{");
for (DataEntry entry : entries) {
GridCacheOperation op = entry.op();
Integer cnt = operationsFound.get(op);
operationsFound.put(op, cnt == null ? 1 : (cnt + 1));
if (entry instanceof UnwrapDataEntry) {
UnwrapDataEntry entry1 = (UnwrapDataEntry)entry;
sb.append(entry1.op())
.append(" for ")
.append(entry1.unwrappedKey());
GridCacheVersion ver = entry.nearXidVersion();
sb.append(", ");
if (ver != null)
sb.append("tx=")
.append(ver)
.append(", ");
}
}
sb.append("}\n");
});
final Integer deletesFound = operationsFound.get(DELETE);
if (log.isInfoEnabled())
log.info(sb.toString());
assertTrue("Delete operations should be found in log: " + operationsFound,
deletesFound != null && deletesFound > 0);
}
/**
* Tests transaction generation and WAL for putAll cache operation.
*
* @throws Exception if failed.
*/
@Test
public void testPrimaryFlagOnTwoNodes() throws Exception {
backupCnt = 1;
IgniteEx ignite = startGrid("node0");
Ignite ignite1 = startGrid(1);
ignite.cluster().state(ACTIVE);
IgniteCache<Integer, IndexedObject> cache = ignite.cache(CACHE_NAME);
backupCnt = 0;
int cntEntries = 100;
List<Integer> keys = findKeys(ignite.localNode(), cache, cntEntries, 0, 0);
Map<Integer, IndexedObject> map = new TreeMap<>();
for (Integer key : keys)
map.putIfAbsent(key, new IndexedObject(key));
cache.putAll(map);
ignite.cluster().active(false);
String subfolderName1 = genDbSubfolderName(ignite, 0);
String subfolderName2 = genDbSubfolderName(ignite1, 1);
stopAllGrids();
String workDir = U.defaultWorkDirectory();
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
Map<GridCacheOperation, Integer> operationsFound = new EnumMap<>(GridCacheOperation.class);
IgniteInClosure<DataRecord> drHnd = dataRecord -> {
List<? extends DataEntry> entries = dataRecord.writeEntries();
for (DataEntry entry : entries) {
GridCacheOperation op = entry.op();
Integer cnt = operationsFound.get(op);
operationsFound.put(op, cnt == null ? 1 : (cnt + 1));
}
};
scanIterateAndCount(
factory,
createIteratorParametersBuilder(workDir, subfolderName1)
.filesOrDirs(
workDir + "/db/wal/" + subfolderName1,
workDir + "/db/wal/archive/" + subfolderName1
),
1,
1,
null, drHnd
);
primary = false;
scanIterateAndCount(
factory,
createIteratorParametersBuilder(workDir, subfolderName2)
.filesOrDirs(
workDir + "/db/wal/" + subfolderName2,
workDir + "/db/wal/archive/" + subfolderName2
),
1,
1,
null,
drHnd
);
}
/**
* Tests transaction generation and WAL for putAll cache operation.
*
* @throws Exception if failed.
*/
@Test
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, value = "false")
public void testRebalanceFlag() throws Exception {
backupCnt = 1;
IgniteEx ignite = startGrid("node0");
Ignite ignite1 = startGrid(1);
ignite.cluster().state(ACTIVE);
IgniteCache<Integer, IndexedObject> cache = ignite.cache(CACHE_NAME);
int cntEntries = 100;
List<Integer> keys = findKeys(ignite.localNode(), cache, cntEntries, 0, 0);
Map<Integer, IndexedObject> map = new TreeMap<>();
for (Integer key : keys)
map.putIfAbsent(key, new IndexedObject(key));
cache.putAll(map);
Ignite ignite2 = startGrid(2);
ignite.cluster().setBaselineTopology(ignite2.cluster().topologyVersion());
backupCnt = 0;
awaitPartitionMapExchange(false, true, null);
String subfolderName1 = genDbSubfolderName(ignite, 0);
String subfolderName2 = genDbSubfolderName(ignite1, 1);
String subfolderName3 = genDbSubfolderName(ignite2, 2);
stopAllGrids();
String workDir = U.defaultWorkDirectory();
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
Map<GridCacheOperation, Integer> operationsFound = new EnumMap<>(GridCacheOperation.class);
IgniteInClosure<DataRecord> drHnd = dataRecord -> {
List<? extends DataEntry> entries = dataRecord.writeEntries();
for (DataEntry entry : entries) {
GridCacheOperation op = entry.op();
Integer cnt = operationsFound.get(op);
operationsFound.put(op, cnt == null ? 1 : (cnt + 1));
}
};
scanIterateAndCount(
factory,
createIteratorParametersBuilder(workDir, subfolderName1)
.filesOrDirs(
workDir + "/db/wal/" + subfolderName1,
workDir + "/db/wal/archive/" + subfolderName1
),
1,
1,
null, drHnd
);
primary = false;
scanIterateAndCount(
factory,
createIteratorParametersBuilder(workDir, subfolderName2)
.filesOrDirs(
workDir + "/db/wal/" + subfolderName2,
workDir + "/db/wal/archive/" + subfolderName2
),
1,
1,
null,
drHnd
);
rebalance = true;
scanIterateAndCount(
factory,
createIteratorParametersBuilder(workDir, subfolderName3)
.filesOrDirs(
workDir + "/db/wal/" + subfolderName3,
workDir + "/db/wal/archive/" + subfolderName3
),
1,
0,
null,
drHnd
);
}
/**
* Tests transaction generation and WAL for putAll cache operation.
*
* @throws Exception if failed.
*/
@Test
public void testPutAllTxIntoTwoNodes() throws Exception {
Ignite ignite = startGrid("node0");
Ignite ignite1 = startGrid(1);
ignite.cluster().active(true);
Map<Object, IndexedObject> map = new TreeMap<>();
int cntEntries = 1000;
for (int i = 0; i < cntEntries; i++)
map.put(i, new IndexedObject(i));
ignite.cache(CACHE_NAME).putAll(map);
ignite.cluster().active(false);
String subfolderName1 = genDbSubfolderName(ignite, 0);
String subfolderName2 = genDbSubfolderName(ignite1, 1);
stopAllGrids();
String workDir = U.defaultWorkDirectory();
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
StringBuilder sb = new StringBuilder();
Map<GridCacheOperation, Integer> operationsFound = new EnumMap<>(GridCacheOperation.class);
IgniteInClosure<DataRecord> drHnd = dataRecord -> {
List<DataEntry> entries = dataRecord.writeEntries();
sb.append("{");
for (DataEntry entry : entries) {
GridCacheOperation op = entry.op();
Integer cnt = operationsFound.get(op);
operationsFound.put(op, cnt == null ? 1 : (cnt + 1));
if (entry instanceof UnwrapDataEntry) {
final UnwrapDataEntry entry1 = (UnwrapDataEntry)entry;
sb.append(entry1.op()).append(" for ").append(entry1.unwrappedKey());
final GridCacheVersion ver = entry.nearXidVersion();
sb.append(", ");
if (ver != null)
sb.append("tx=").append(ver).append(", ");
}
}
sb.append("}\n");
};
scanIterateAndCount(
factory,
createIteratorParametersBuilder(workDir, subfolderName1)
.filesOrDirs(
workDir + "/db/wal/" + subfolderName1,
workDir + "/db/wal/archive/" + subfolderName1
),
1,
1,
null, drHnd
);
scanIterateAndCount(
factory,
createIteratorParametersBuilder(workDir, subfolderName2)
.filesOrDirs(
workDir + "/db/wal/" + subfolderName2,
workDir + "/db/wal/archive/" + subfolderName2
),
1,
1,
null,
drHnd
);
Integer createsFound = operationsFound.get(CREATE);
if (log.isInfoEnabled())
log.info(sb.toString());
assertTrue("Create operations should be found in log: " + operationsFound,
createsFound != null && createsFound > 0);
assertTrue("Create operations count should be at least " + cntEntries + " in log: " + operationsFound,
createsFound >= cntEntries);
}
/**
* Tests transaction generation and WAL for putAll cache operation.
*
* @throws Exception if failed.
*/
@Test
public void testTxRecordsReadWoBinaryMeta() throws Exception {
Ignite ignite = startGrid("node0");
ignite.cluster().state(ClusterState.ACTIVE);
Map<Object, IndexedObject> map = new TreeMap<>();
for (int i = 0; i < 1000; i++)
map.put(i, new IndexedObject(i));
ignite.cache(CACHE_NAME).putAll(map);
ignite.cluster().state(ClusterState.INACTIVE);
String workDir = U.defaultWorkDirectory();
String subfolderName = genDbSubfolderName(ignite, 0);
stopAllGrids();
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(new NullLogger());
IteratorParametersBuilder params = createIteratorParametersBuilder(workDir, subfolderName);
scanIterateAndCount(
factory,
params.filesOrDirs(workDir),
1000,
1,
null,
null
);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCheckBoundsIterator() throws Exception {
Ignite ignite = startGrid("node0");
ignite.cluster().active(true);
try (IgniteDataStreamer<Integer, IndexedObject> st = ignite.dataStreamer(CACHE_NAME)) {
st.allowOverwrite(true);
for (int i = 0; i < 10_000; i++)
st.addData(i, new IndexedObject(i));
}
stopAllGrids();
List<WALPointer> wal = new ArrayList<>();
String workDir = U.defaultWorkDirectory();
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory();
try (WALIterator it = factory.iterator(workDir)) {
while (it.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
wal.add(tup.get1());
}
}
Random rnd = new Random();
int from0 = rnd.nextInt(wal.size() - 2) + 1;
int to0 = wal.size() - 1;
// +1 for skip first record.
WALPointer exp0First = wal.get(from0);
WALPointer exp0Last = wal.get(to0);
T2<WALPointer, WALRecord> actl0First = null;
T2<WALPointer, WALRecord> actl0Last = null;
int records0 = 0;
try (WALIterator it = factory.iterator(exp0First, workDir)) {
while (it.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
if (actl0First == null)
actl0First = new T2<>(tup.get1(), tup.get2());
actl0Last = new T2<>(tup.get1(), tup.get2());
records0++;
}
}
log.info("Check REPLAY FROM:" + exp0First + "\n" +
"expFirst=" + exp0First + " actlFirst=" + actl0First + ", " +
"expLast=" + exp0Last + " actlLast=" + actl0Last);
// +1 because bound include.
Assert.assertEquals(to0 - from0 + 1, records0);
Assert.assertNotNull(actl0First);
Assert.assertNotNull(actl0Last);
Assert.assertEquals(exp0First, actl0First.get1());
Assert.assertEquals(exp0Last, actl0Last.get1());
int from1 = 0;
int to1 = rnd.nextInt(wal.size() - 3) + 1;
// -3 for skip last record.
WALPointer exp1First = wal.get(from1);
WALPointer exp1Last = wal.get(to1);
T2<WALPointer, WALRecord> actl1First = null;
T2<WALPointer, WALRecord> actl1Last = null;
int records1 = 0;
try (WALIterator it = factory.iterator(
new IteratorParametersBuilder()
.filesOrDirs(workDir)
.to(exp1Last)
)) {
while (it.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
if (actl1First == null)
actl1First = new T2<>(tup.get1(), tup.get2());
actl1Last = new T2<>(tup.get1(), tup.get2());
records1++;
}
}
log.info("Check REPLAY TO:" + exp1Last + "\n" +
"expFirst=" + exp1First + " actlFirst=" + actl1First + ", " +
"expLast=" + exp1Last + " actlLast=" + actl1Last);
// +1 because bound include.
Assert.assertEquals(to1 - from1 + 1, records1);
Assert.assertNotNull(actl1First);
Assert.assertNotNull(actl1Last);
Assert.assertEquals(exp1First, actl1First.get1());
Assert.assertEquals(exp1Last, actl1Last.get1());
int from2 = rnd.nextInt(wal.size() - 2);
int to2 = rnd.nextInt((wal.size() - 1) - from2) + from2;
WALPointer exp2First = wal.get(from2);
WALPointer exp2Last = wal.get(to2);
T2<WALPointer, WALRecord> actl2First = null;
T2<WALPointer, WALRecord> actl2Last = null;
int records2 = 0;
try (WALIterator it = factory.iterator(
new IteratorParametersBuilder()
.filesOrDirs(workDir)
.from(exp2First)
.to(exp2Last)
)) {
while (it.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.next();
if (actl2First == null)
actl2First = new T2<>(tup.get1(), tup.get2());
actl2Last = new T2<>(tup.get1(), tup.get2());
records2++;
}
}
log.info("Check REPLAY BETWEEN:" + exp2First + " " + exp2Last + "\n" +
"expFirst=" + exp2First + " actlFirst=" + actl2First + ", " +
"expLast=" + exp2Last + " actlLast=" + actl2Last);
// +1 because bound include.
Assert.assertEquals(to2 - from2 + 1, records2);
Assert.assertNotNull(actl2First);
Assert.assertNotNull(actl2Last);
Assert.assertEquals(exp2First, actl2First.get1());
Assert.assertEquals(exp2Last, actl2Last.get1());
}
/**
* @param workDir Work directory.
* @param subfolderName Subfolder name.
* @return WAL iterator factory.
* @throws IgniteCheckedException If failed.
*/
@NotNull private IteratorParametersBuilder createIteratorParametersBuilder(
String workDir,
String subfolderName
) throws IgniteCheckedException {
File binaryMeta = U.resolveWorkDirectory(workDir, DataStorageConfiguration.DFLT_BINARY_METADATA_PATH,
false);
File binaryMetaWithConsId = new File(binaryMeta, subfolderName);
File marshallerMapping = U.resolveWorkDirectory(workDir, DataStorageConfiguration.DFLT_MARSHALLER_PATH, false);
return new IteratorParametersBuilder()
.binaryMetadataFileStoreDir(binaryMetaWithConsId)
.marshallerMappingFileStoreDir(marshallerMapping);
}
/**
* @param values collection with numbers.
* @return sum of numbers.
*/
private int valuesSum(Iterable<Integer> values) {
int sum = 0;
for (Integer next : values) {
if (next != null)
sum += next;
}
return sum;
}
/**
* Iterates over data records, checks each DataRecord and its entries, finds out all transactions in WAL.
*
* @param walIter iterator to use.
* @return count of data records observed for each global TX ID. Contains null for non tx updates.
* @throws IgniteCheckedException if failure.
*/
private Map<GridCacheVersion, Integer> iterateAndCountDataRecord(
WALIterator walIter,
@Nullable IgniteBiInClosure<Object, Object> cacheObjHnd,
@Nullable IgniteInClosure<DataRecord> dataRecordHnd
) throws IgniteCheckedException {
Map<GridCacheVersion, Integer> entriesUnderTxFound = new HashMap<>();
try (WALIterator stIt = walIter) {
while (stIt.hasNextX()) {
IgniteBiTuple<WALPointer, WALRecord> tup = stIt.nextX();
WALRecord walRecord = tup.get2();
WALRecord.RecordType type = walRecord.type();
//noinspection EnumSwitchStatementWhichMissesCases
switch (type) {
case DATA_RECORD_V2:
// Fallthrough.
case MVCC_DATA_RECORD: {
assert walRecord instanceof DataRecord;
DataRecord dataRecord = (DataRecord)walRecord;
if (dataRecordHnd != null)
dataRecordHnd.apply(dataRecord);
List<DataEntry> entries = dataRecord.writeEntries();
for (DataEntry entry : entries) {
if (walRecord.type() == DATA_RECORD_V2) {
assertEquals(primary, (entry.flags() & DataEntry.PRIMARY_FLAG) != 0);
assertEquals(rebalance, (entry.flags() & DataEntry.PRELOAD_FLAG) != 0);
}
GridCacheVersion globalTxId = entry.nearXidVersion();
Object unwrappedKeyObj;
Object unwrappedValObj;
if (entry instanceof UnwrappedDataEntry) {
UnwrappedDataEntry unwrapDataEntry = (UnwrappedDataEntry)entry;
unwrappedKeyObj = unwrapDataEntry.unwrappedKey();
unwrappedValObj = unwrapDataEntry.unwrappedValue();
}
else if (entry instanceof MarshalledDataEntry) {
unwrappedKeyObj = null;
unwrappedValObj = null;
//can't check value
}
else {
final CacheObject val = entry.value();
unwrappedValObj = val instanceof BinaryObject ? val : val.value(null, false);
final CacheObject key = entry.key();
unwrappedKeyObj = key instanceof BinaryObject ? key : key.value(null, false);
}
if (DUMP_RECORDS)
log.info("//Entry operation " + entry.op() + "; cache Id" + entry.cacheId() + "; " +
"under transaction: " + globalTxId +
//; entry " + entry +
"; Key: " + unwrappedKeyObj +
"; Value: " + unwrappedValObj);
if (cacheObjHnd != null && (unwrappedKeyObj != null || unwrappedValObj != null))
cacheObjHnd.apply(unwrappedKeyObj, unwrappedValObj);
Integer entriesUnderTx = entriesUnderTxFound.get(globalTxId);
entriesUnderTxFound.put(globalTxId, entriesUnderTx == null ? 1 : entriesUnderTx + 1);
}
}
break;
case TX_RECORD:
// Fallthrough
case MVCC_TX_RECORD: {
assert walRecord instanceof TxRecord;
TxRecord txRecord = (TxRecord)walRecord;
GridCacheVersion globalTxId = txRecord.nearXidVersion();
if (DUMP_RECORDS)
log.info("//Tx Record, state: " + txRecord.state() +
"; nearTxVersion" + globalTxId);
}
}
}
}
return entriesUnderTxFound;
}
/**
* Puts provided number of records to fill WAL.
*
* @param ignite ignite instance.
* @param recordsToWrite count.
*/
private void putDummyRecords(Ignite ignite, int recordsToWrite) {
IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME);
for (int i = 0; i < recordsToWrite; i++)
cache0.put(i, new IndexedObject(i));
}
/**
* Puts provided number of records to fill WAL.
*
* @param ignite ignite instance.
* @param recordsToWrite count.
*/
private void putAllDummyRecords(Ignite ignite, int recordsToWrite) {
IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME);
Map<Object, Object> values = new HashMap<>();
for (int i = 0; i < recordsToWrite; i++)
values.put(i, new IndexedObject(i));
cache0.putAll(values);
}
/**
* Puts provided number of records to fill WAL under transactions.
*
* @param ignite ignite instance.
* @param recordsToWrite count.
* @param txCnt transactions to run. If number is less then records count, txCnt records will be written.
*/
private IgniteCache<Object, Object> txPutDummyRecords(Ignite ignite, int recordsToWrite, int txCnt) {
IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME);
int keysPerTx = recordsToWrite / txCnt;
if (keysPerTx == 0)
keysPerTx = 1;
for (int t = 0; t < txCnt; t++) {
try (Transaction tx = ignite.transactions().txStart()) {
for (int i = t * keysPerTx; i < (t + 1) * keysPerTx; i++)
cache0.put(i, new IndexedObject(i));
tx.commit();
}
}
return cache0;
}
/** Enum for cover binaryObject enum save/load. */
enum TestEnum {
/** */
A,
/** */
B,
/** */
C
}
/** Special class to test WAL reader resistance to Serializable interface. */
static class TestSerializable implements Serializable {
/** */
private static final long serialVersionUID = 0L;
/** I value. */
private int iVal;
/**
* Creates test object.
*
* @param iVal I value.
*/
TestSerializable(int iVal) {
this.iVal = iVal;
}
/** {@inheritDoc} */
@Override public String toString() {
return "TestSerializable{" +
"iVal=" + iVal +
'}';
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TestSerializable that = (TestSerializable)o;
return iVal == that.iVal;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return iVal;
}
}
/** Special class to test WAL reader resistance to Serializable interface. */
static class TestExternalizable implements Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** I value. */
private int iVal;
/** Noop ctor for unmarshalling */
public TestExternalizable() {
}
/**
* Creates test object with provided value.
*
* @param iVal I value.
*/
TestExternalizable(int iVal) {
this.iVal = iVal;
}
/** {@inheritDoc} */
@Override public String toString() {
return "TestExternalizable{" +
"iVal=" + iVal +
'}';
}
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(iVal);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
iVal = in.readInt();
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TestExternalizable that = (TestExternalizable)o;
return iVal == that.iVal;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return iVal;
}
}
/** Container class to test toString of data records. */
static class TestStringContainerToBePrinted {
/** */
private String data;
/**
* Creates container
*
* @param data value to be searched in to String
*/
TestStringContainerToBePrinted(String data) {
this.data = data;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TestStringContainerToBePrinted printed = (TestStringContainerToBePrinted)o;
return data != null ? data.equals(printed.data) : printed.data == null;
}
/** {@inheritDoc} */
@Override public int hashCode() {
return data != null ? data.hashCode() : 0;
}
/** {@inheritDoc} */
@Override public String toString() {
return "TestStringContainerToBePrinted{" +
"data='" + data + '\'' +
'}';
}
}
/** Test class for storing in ignite. */
private static class Organization {
/** Key. */
private final int key;
/** Name. */
private final String name;
/**
* @param key Key.
* @param name Name.
*/
Organization(int key, String name) {
this.key = key;
this.name = name;
}
/** {@inheritDoc} */
@Override public String toString() {
return "Organization{" +
"key=" + key +
", name='" + name + '\'' +
'}';
}
}
}