blob: 9f39ede2252e2f205e71c039896a8cf2be022360 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cdc;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT;
import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;
import static org.apache.ignite.internal.util.IgniteUtils.KB;
import static org.apache.ignite.internal.util.IgniteUtils.MB;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** Check only {@link DataRecord} written to the WAL for in-memory cache. */
@RunWith(Parameterized.class)
public class WalForCdcTest extends GridCommonAbstractTest {
/** */
private static final int RECORD_COUNT = 10;
/** */
@Parameterized.Parameter
public CacheMode mode;
/** */
@Parameterized.Parameter(1)
public CacheAtomicityMode atomicityMode;
/** */
private boolean persistenceEnabled;
/** */
private boolean cdcEnabled;
/** */
private long archiveSz = UNLIMITED_WAL_ARCHIVE;
/** */
@Parameterized.Parameters(name = "mode={0}, atomicityMode={1}")
public static Collection<?> parameters() {
List<Object[]> params = new ArrayList<>();
for (CacheMode mode : Arrays.asList(REPLICATED, PARTITIONED))
for (CacheAtomicityMode atomicityMode : Arrays.asList(ATOMIC, TRANSACTIONAL))
params.add(new Object[] {mode, atomicityMode});
return params;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
.setWalSegmentSize((int)(2 * MB))
.setMaxWalArchiveSize(archiveSz)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(persistenceEnabled)
.setCdcEnabled(cdcEnabled)));
cfg.setConsistentId(igniteInstanceName);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
cdcEnabled = true;
persistenceEnabled = false;
}
/** */
@Test
public void testOnlyDataRecordWritten() throws Exception {
IgniteEx ignite1 = startGrid(0);
ignite1.cluster().state(ClusterState.ACTIVE);
AtomicInteger cntr = new AtomicInteger();
// Check only `DataRecords` written in WAL for in-memory cache with CDC enabled.
doTestWal(ignite1, cache -> {
for (int i = 0; i < RECORD_COUNT; i++)
cache.put(keyForNode(ignite1.affinity(DEFAULT_CACHE_NAME), cntr, ignite1.localNode()), i);
}, RECORD_COUNT);
// Check no WAL written during rebalance.
IgniteEx ignite2 = startGrid(1);
awaitPartitionMapExchange(false, true, null);
// Can't use `waitForCondition` because if test passed
// then no `DataRecords` loged therefore no segment archivation.
Thread.sleep(3 * WAL_ARCHIVE_TIMEOUT);
int walRecCnt = checkDataRecords(ignite2);
assertEquals(0, walRecCnt);
// Check `DataRecords` written on second node after rebalance.
doTestWal(ignite1, cache -> {
for (int i = 0; i < RECORD_COUNT; i++)
cache.put(keyForNode(ignite1.affinity(DEFAULT_CACHE_NAME), cntr, ignite1.localNode()), i);
}, RECORD_COUNT * 2);
doTestWal(ignite2, cache -> {
for (int i = 0; i < RECORD_COUNT; i++)
cache.put(keyForNode(ignite2.affinity(DEFAULT_CACHE_NAME), cntr, ignite2.localNode()), i);
}, RECORD_COUNT * (mode == REPLICATED ? 2 : 1));
}
/** */
@Test
public void testWalDisable() throws Exception {
persistenceEnabled = true;
IgniteEx ignite = startGrid(0);
ignite.cluster().state(ClusterState.ACTIVE);
doTestWal(ignite, cache -> {
for (int i = 0; i < RECORD_COUNT / 2; i++)
cache.put(i, i);
ignite.cluster().disableWal(DEFAULT_CACHE_NAME);
for (int i = 0; i < RECORD_COUNT; i++)
cache.put(i, i);
ignite.cluster().enableWal(DEFAULT_CACHE_NAME);
for (int i = RECORD_COUNT / 2; i < RECORD_COUNT; i++)
cache.put(i, i);
}, RECORD_COUNT);
}
/** */
@Test
public void testWalDisabledIfPersistenceAndCdcDisabled() throws Exception {
persistenceEnabled = false;
cdcEnabled = false;
IgniteEx ignite = startGrid(0);
ignite.cluster().state(ClusterState.ACTIVE);
ignite.getOrCreateCache(new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
.setCacheMode(mode)
.setAtomicityMode(atomicityMode));
assertNull(ignite.context().cache().context().wal());
assertNull(getFieldValue(ignite.context().cache().context(), "cdcWalMgr"));
}
/** */
@Test
public void testArchiveCleared() throws Exception {
persistenceEnabled = false;
cdcEnabled = true;
archiveSz = 10 * MB;
IgniteEx ignite = startGrid(0);
ignite.cluster().state(ClusterState.ACTIVE);
IgniteCache<Integer, byte[]> cache = ignite.getOrCreateCache(
new CacheConfiguration<Integer, byte[]>(DEFAULT_CACHE_NAME)
.setCacheMode(mode)
.setAtomicityMode(atomicityMode));
IntConsumer createData = (entryCnt) -> {
for (int i = 0; i < entryCnt; i++) {
byte[] payload = new byte[(int)KB];
ThreadLocalRandom.current().nextBytes(payload);
cache.put(i, payload);
}
};
IgniteWriteAheadLogManager wal = ignite.context().cache().context().wal(true);
long startSgmnt = wal.currentSegment();
createData.accept((int)(archiveSz / (2 * KB)));
long finishSgmnt = wal.currentSegment();
String archive = archive(ignite);
assertTrue(finishSgmnt > startSgmnt);
assertTrue(
"Wait for start segment archivation",
waitForCondition(() -> startSgmnt <= wal.lastArchivedSegment(), getTestTimeout())
);
File startSgmntArchived = new File(archive, FileDescriptor.fileName(startSgmnt));
assertTrue("Check archived segment file exists", startSgmntArchived.exists());
createData.accept((int)(archiveSz / KB));
assertTrue(
"Wait for archived segment cleaned",
waitForCondition(() -> !startSgmntArchived.exists(), getTestTimeout())
);
}
/** */
private void doTestWal(
IgniteEx ignite,
Consumer<IgniteCache<Integer, Integer>> putData,
int expWalRecCnt
) throws Exception {
IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(
new CacheConfiguration<Integer, Integer>(DEFAULT_CACHE_NAME)
.setCacheMode(mode)
.setAtomicityMode(atomicityMode));
long archiveIdx = ignite.context().cache().context().wal(true).lastArchivedSegment();
putData.accept(cache);
assertTrue(waitForCondition(
() -> archiveIdx < ignite.context().cache().context().wal(true).lastArchivedSegment(),
getTestTimeout()
));
int walRecCnt = checkDataRecords(ignite);
assertEquals(expWalRecCnt, walRecCnt);
}
/** */
private int checkDataRecords(IgniteEx ignite) throws IgniteCheckedException {
WALIterator iter = new IgniteWalIteratorFactory(log).iterator(new IteratorParametersBuilder()
.ioFactory(new RandomAccessFileIOFactory())
.filesOrDirs(archive(ignite)));
int walRecCnt = 0;
while (iter.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> rec = iter.next();
if (persistenceEnabled && (!(rec.get2() instanceof DataRecord)))
continue;
assertTrue(rec.get2() instanceof DataRecord);
DataRecord dataRec = (DataRecord)rec.get2();
for (int i = 0; i < dataRec.entryCount(); i++)
assertEquals(CU.cacheId(DEFAULT_CACHE_NAME), dataRec.get(i).cacheId());
walRecCnt++;
}
return walRecCnt;
}
/**
* @param ignite Ignite.
* @return WAL archive patch
* @throws IgniteCheckedException If failed
*/
private static String archive(IgniteEx ignite) throws IgniteCheckedException {
return U.resolveWorkDirectory(
U.defaultWorkDirectory(),
ignite.configuration().getDataStorageConfiguration().getWalArchivePath() + "/" +
U.maskForFileName(ignite.configuration().getIgniteInstanceName()),
false
).getAbsolutePath();
}
}