blob: eda1082d0d969f974cd304b1eba371865e1b736d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.cdc;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DATA_STORAGE_FOLDER_BY_CONSISTENT_ID;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.DELETE;
import static org.apache.ignite.cdc.AbstractCdcTest.ChangeEventType.UPDATE;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
import static org.junit.Assume.assumeTrue;
/** */
@RunWith(Parameterized.class)
public class CdcSelfTest extends AbstractCdcTest {
/** */
public static final String TX_CACHE_NAME = "tx-cache";
/** */
public static final int WAL_ARCHIVE_TIMEOUT = 5_000;
/** */
@Parameterized.Parameter
public boolean specificConsistentId;
/** */
@Parameterized.Parameter(1)
public WALMode walMode;
/** */
@Parameterized.Parameter(2)
public boolean persistenceEnabled;
/** */
@Parameterized.Parameters(name = "consistentId={0}, wal={1}, persistence={2}")
public static Collection<?> parameters() {
List<Object[]> params = new ArrayList<>();
for (WALMode mode : EnumSet.of(WALMode.FSYNC, WALMode.LOG_ONLY, WALMode.BACKGROUND))
for (boolean specificConsistentId : new boolean[] {false, true})
for (boolean persistenceEnabled : new boolean[] {true, false})
params.add(new Object[] {specificConsistentId, mode, persistenceEnabled});
return params;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
if (specificConsistentId)
cfg.setConsistentId(igniteInstanceName);
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalMode(walMode)
.setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(persistenceEnabled)
.setCdcEnabled(true))
.setWalArchivePath(DFLT_WAL_ARCHIVE_PATH + "/" + U.maskForFileName(igniteInstanceName)));
cfg.setCacheConfiguration(
new CacheConfiguration<>(TX_CACHE_NAME)
.setAtomicityMode(TRANSACTIONAL)
.setBackups(1)
);
return cfg;
}
/** Simplest CDC test. */
@Test
public void testReadAllKeysCommitAll() throws Exception {
// Read all records from iterator.
readAll(new UserCdcConsumer(), true);
}
/** Simplest CDC test but read one event at a time to check correct iterator work. */
@Test
public void testReadAllKeysWithoutCommit() throws Exception {
// Read one record per call.
readAll(new UserCdcConsumer() {
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
super.onEvents(Collections.singleton(evts.next()).iterator());
return false;
}
}, false);
}
/** Simplest CDC test but commit every event to check correct state restore. */
@Test
public void testReadAllKeysCommitEachEvent() throws Exception {
// Read one record per call and commit.
readAll(new UserCdcConsumer() {
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
super.onEvents(Collections.singleton(evts.next()).iterator());
return true;
}
}, true);
}
/** */
private void readAll(UserCdcConsumer cnsmr, boolean offsetCommit) throws Exception {
IgniteConfiguration cfg = getConfiguration("ignite-0");
Ignite ign = startGrid(cfg);
ign.cluster().state(ACTIVE);
IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME);
IgniteCache<Integer, User> txCache = ign.getOrCreateCache(TX_CACHE_NAME);
addAndWaitForConsumption(
cnsmr,
cfg,
cache,
txCache,
CdcSelfTest::addData,
0,
KEYS_CNT + 3,
offsetCommit
);
removeData(cache, 0, KEYS_CNT);
CdcMain cdcMain = createCdc(cnsmr, cfg);
IgniteInternalFuture<?> rmvFut = runAsync(cdcMain);
waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, cnsmr);
checkMetrics(cdcMain, offsetCommit ? KEYS_CNT : ((KEYS_CNT + 3) * 2 + KEYS_CNT));
rmvFut.cancel();
assertTrue(cnsmr.stopped());
stopAllGrids();
cleanPersistenceDir();
}
/** */
@Test
public void testReadOneByOneForBackup() throws Exception {
assumeTrue("CDC with 2 local nodes can't determine correct PDS directory without specificConsistentId.",
specificConsistentId);
IgniteEx ign = startGrids(2);
ign.cluster().state(ACTIVE);
IgniteCache<Integer, Integer> txCache = ign.cache(TX_CACHE_NAME);
awaitPartitionMapExchange();
int keysCnt = 3;
Map<Integer, Integer> batch = primaryKeys(txCache, keysCnt).stream()
.collect(Collectors.toMap(key -> key, val -> val, (a, b) -> a, TreeMap::new));
// Put data in batch because it will be logged in form of `DataRecord(List<DataEntry)` on backup node.
txCache.putAll(batch);
// Check `DataRecord(List<DataEntry>)` logged.
File archive = U.resolveWorkDirectory(
U.defaultWorkDirectory(),
grid(1).configuration().getDataStorageConfiguration().getWalArchivePath(),
false
);
IteratorParametersBuilder param = new IteratorParametersBuilder().filesOrDirs(archive)
.filter((type, pointer) -> type == WALRecord.RecordType.DATA_RECORD_V2);
assertTrue("DataRecord(List<DataEntry>) should be logged.", waitForCondition(() -> {
try (WALIterator iter = new IgniteWalIteratorFactory(log).iterator(param)) {
while (iter.hasNext()) {
DataRecord rec = (DataRecord)iter.next().get2();
if (rec.entryCount() > 1)
return true;
}
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
return false;
}, getTestTimeout()));
for (int i = 0; i < 2; i++) {
IgniteEx grid = grid(i);
Set<Integer> data = new HashSet<>();
AtomicBoolean firstEvt = new AtomicBoolean(true);
CdcConsumer cnsmr = new CdcConsumer() {
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
if (!firstEvt.get())
throw new RuntimeException("Expected fail.");
assertTrue(evts.hasNext());
data.add((Integer)evts.next().key());
firstEvt.set(false);
if (data.size() == keysCnt)
throw new RuntimeException("Expected fail.");
return true;
}
@Override public void onTypes(Iterator<BinaryType> types) {
types.forEachRemaining(t -> assertNotNull(t));
}
@Override public void onMappings(Iterator<TypeMapping> mappings) {
mappings.forEachRemaining(m -> assertNotNull(m));
}
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
cacheEvents.forEachRemaining(ce -> assertNotNull(ce));
}
/** {@inheritDoc} */
@Override public void onCacheDestroy(Iterator<Integer> caches) {
caches.forEachRemaining(ce -> assertNotNull(ce));
}
@Override public void stop() {
// No-op.
}
@Override public void start(MetricRegistry mreg) {
// No-op.
}
};
for (int j = 0; j < keysCnt; j++) {
CdcMain cdc = createCdc(cnsmr, getConfiguration(grid.name()));
IgniteInternalFuture<?> fut = runAsync(cdc);
// Restart CDC after read and commit single key.
assertTrue(waitForCondition(fut::isDone, getTestTimeout()));
assertEquals(j + 1, data.size());
firstEvt.set(true);
}
assertTrue(F.eqNotOrdered(batch.keySet(), data));
}
}
/** Test check that state restored correctly and next event read by CDC on each restart. */
@Test
public void testReadFromNextEntry() throws Exception {
IgniteConfiguration cfg = getConfiguration("ignite-0");
IgniteEx ign = startGrid(cfg);
ign.cluster().state(ACTIVE);
IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME);
int keysCnt = 10;
addData(cache, 0, keysCnt / 2);
long segIdx = ign.context().cache().context().wal(true).lastArchivedSegment();
waitForCondition(() -> ign.context().cache().context().wal(true).lastArchivedSegment() > segIdx, getTestTimeout());
addData(cache, keysCnt / 2, keysCnt);
AtomicInteger expKey = new AtomicInteger();
int lastKey = 0;
while (expKey.get() != keysCnt) {
String errMsg = "Expected fail";
IgniteInternalFuture<?> fut = runAsync(createCdc(new CdcConsumer() {
boolean oneConsumed;
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
// Fail application after one event read AND state committed.
if (oneConsumed)
throw new RuntimeException(errMsg);
CdcEvent evt = evts.next();
assertEquals(expKey.get(), evt.key());
expKey.incrementAndGet();
// Fail application if all expected data read e.g. next event doesn't exist.
if (expKey.get() == keysCnt)
throw new RuntimeException(errMsg);
oneConsumed = true;
return true;
}
@Override public void onTypes(Iterator<BinaryType> types) {
types.forEachRemaining(t -> assertNotNull(t));
}
@Override public void onMappings(Iterator<TypeMapping> mappings) {
mappings.forEachRemaining(m -> assertNotNull(m));
}
@Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
cacheEvents.forEachRemaining(ce -> assertNotNull(ce));
}
@Override public void onCacheDestroy(Iterator<Integer> caches) {
caches.forEachRemaining(ce -> assertNotNull(ce));
}
@Override public void stop() {
// No-op.
}
@Override public void start(MetricRegistry mreg) {
// No-op.
}
}, cfg));
assertTrue(waitForCondition(fut::isDone, getTestTimeout()));
if (!errMsg.equals(fut.error().getMessage()))
throw new RuntimeException(fut.error());
assertEquals(1, expKey.get() - lastKey);
lastKey = expKey.get();
}
}
/** */
@Test
public void testReadBeforeGracefulShutdown() throws Exception {
Ignite ign = startGrid(getConfiguration("ignite-0"));
ign.cluster().state(ACTIVE);
CountDownLatch cnsmrStarted = new CountDownLatch(1);
CountDownLatch startProcEvts = new CountDownLatch(1);
UserCdcConsumer cnsmr = new UserCdcConsumer() {
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
cnsmrStarted.countDown();
try {
startProcEvts.await(getTestTimeout(), MILLISECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return super.onEvents(evts);
}
};
CdcMain cdc = createCdc(cnsmr, getConfiguration(ign.name()));
runAsync(cdc);
IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME);
addData(cache, 0, KEYS_CNT);
// Make sure all streamed data will become available for consumption.
Thread.sleep(2 * WAL_ARCHIVE_TIMEOUT);
cnsmrStarted.await(getTestTimeout(), MILLISECONDS);
// Initiate graceful shutdown.
cdc.stop();
startProcEvts.countDown();
waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
assertTrue(waitForCondition(cnsmr::stopped, getTestTimeout()));
List<Integer> keys = cnsmr.data(UPDATE, cacheId(DEFAULT_CACHE_NAME));
assertEquals(KEYS_CNT, keys.size());
for (int i = 0; i < KEYS_CNT; i++)
assertTrue(keys.contains(i));
}
/** */
@Test
@WithSystemProperty(key = IGNITE_DATA_STORAGE_FOLDER_BY_CONSISTENT_ID, value = "true")
public void testMultiNodeConsumption() throws Exception {
IgniteEx ign1 = startGrid(0);
IgniteEx ign2 = startGrid(1);
ign1.cluster().state(ACTIVE);
IgniteCache<Integer, User> cache = ign1.getOrCreateCache(DEFAULT_CACHE_NAME);
// Calculate expected count of key for each node.
int[] keysCnt = new int[2];
for (int i = 0; i < KEYS_CNT * 2; i++) {
Ignite primary = primaryNode(i, DEFAULT_CACHE_NAME);
assertTrue(primary == ign1 || primary == ign2);
keysCnt[primary == ign1 ? 0 : 1]++;
}
// Adds data concurrently with CDC start.
IgniteInternalFuture<?> addDataFut = runAsync(() -> addData(cache, 0, KEYS_CNT));
UserCdcConsumer cnsmr1 = new UserCdcConsumer();
UserCdcConsumer cnsmr2 = new UserCdcConsumer();
IgniteConfiguration cfg1 = getConfiguration(ign1.name());
IgniteConfiguration cfg2 = getConfiguration(ign2.name());
// Always run CDC with consistent id to ensure instance read data for specific node.
if (!specificConsistentId) {
cfg1.setConsistentId((Serializable)ign1.localNode().consistentId());
cfg2.setConsistentId((Serializable)ign2.localNode().consistentId());
}
CountDownLatch latch = new CountDownLatch(2);
GridAbsPredicate sizePredicate1 = sizePredicate(keysCnt[0], DEFAULT_CACHE_NAME, UPDATE, cnsmr1);
GridAbsPredicate sizePredicate2 = sizePredicate(keysCnt[1], DEFAULT_CACHE_NAME, UPDATE, cnsmr2);
CdcMain cdc1 = createCdc(cnsmr1, cfg1, latch, sizePredicate1);
CdcMain cdc2 = createCdc(cnsmr2, cfg2, latch, sizePredicate2);
IgniteInternalFuture<?> fut1 = runAsync(cdc1);
IgniteInternalFuture<?> fut2 = runAsync(cdc2);
addDataFut.get(getTestTimeout());
runAsync(() -> addData(cache, KEYS_CNT, KEYS_CNT * 2)).get(getTestTimeout());
// Wait while predicate will become true and state saved on the disk for both cdc.
assertTrue(latch.await(getTestTimeout(), MILLISECONDS));
checkMetrics(cdc1, keysCnt[0]);
checkMetrics(cdc2, keysCnt[1]);
assertFalse(cnsmr1.stopped());
assertFalse(cnsmr2.stopped());
fut1.cancel();
fut2.cancel();
assertTrue(cnsmr1.stopped());
assertTrue(cnsmr2.stopped());
removeData(cache, 0, KEYS_CNT * 2);
cdc1 = createCdc(cnsmr1, cfg1);
cdc2 = createCdc(cnsmr2, cfg2);
IgniteInternalFuture<?> rmvFut1 = runAsync(cdc1);
IgniteInternalFuture<?> rmvFut2 = runAsync(cdc2);
waitForSize(KEYS_CNT * 2, DEFAULT_CACHE_NAME, DELETE, cnsmr1, cnsmr2);
checkMetrics(cdc1, keysCnt[0]);
checkMetrics(cdc2, keysCnt[1]);
rmvFut1.cancel();
rmvFut2.cancel();
assertTrue(cnsmr1.stopped());
assertTrue(cnsmr2.stopped());
}
/** */
@Test
public void testCdcSingleton() throws Exception {
IgniteEx ign = startGrid(0);
UserCdcConsumer cnsmr1 = new UserCdcConsumer();
UserCdcConsumer cnsmr2 = new UserCdcConsumer();
IgniteInternalFuture<?> fut1 = runAsync(createCdc(cnsmr1, getConfiguration(ign.name())));
IgniteInternalFuture<?> fut2 = runAsync(createCdc(cnsmr2, getConfiguration(ign.name())));
assertTrue(waitForCondition(() -> fut1.isDone() || fut2.isDone(), getTestTimeout()));
assertEquals(fut1.error() == null, fut2.error() != null);
if (fut1.isDone()) {
fut2.cancel();
assertTrue(cnsmr2.stopped());
}
else {
fut1.cancel();
assertTrue(cnsmr1.stopped());
}
}
/** */
@Test
public void testReReadWhenStateWasNotStored() throws Exception {
Supplier<IgniteEx> restart = () -> {
stopAllGrids(false);
try {
IgniteEx ign = startGrid(getConfiguration("ignite-0"));
ign.cluster().state(ACTIVE);
return ign;
}
catch (Exception e) {
throw new RuntimeException(e);
}
};
IgniteEx ign = restart.get();
IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME);
addData(cache, 0, KEYS_CNT / 2);
ign = restart.get();
cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME);
addData(cache, KEYS_CNT / 2, KEYS_CNT);
UserCdcConsumer cnsmr = new UserCdcConsumer() {
@Override protected boolean commit() {
return false;
}
};
for (int i = 0; i < 3; i++) {
CdcMain cdc = createCdc(cnsmr, getConfiguration(ign.name()));
IgniteInternalFuture<?> fut = runAsync(cdc);
waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
checkMetrics(cdc, KEYS_CNT);
fut.cancel();
assertTrue(cnsmr.stopped());
cnsmr.data.clear();
}
AtomicBoolean consumeHalf = new AtomicBoolean(true);
AtomicBoolean halfCommitted = new AtomicBoolean(false);
int half = KEYS_CNT / 2;
cnsmr = new UserCdcConsumer() {
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
if (consumeHalf.get() && F.size(data(UPDATE, cacheId(DEFAULT_CACHE_NAME))) == half) {
// This means that state committed as a result of the previous call.
halfCommitted.set(true);
return false;
}
while (evts.hasNext()) {
CdcEvent evt = evts.next();
if (!evt.primary())
continue;
data.computeIfAbsent(
F.t(evt.value() == null ? DELETE : UPDATE, evt.cacheId()),
k -> new ArrayList<>()).add((Integer)evt.key()
);
if (consumeHalf.get())
return F.size(data(UPDATE, cacheId(DEFAULT_CACHE_NAME))) == half;
}
return true;
}
};
CdcMain cdc = createCdc(cnsmr, getConfiguration(ign.name()));
IgniteInternalFuture<?> fut = runAsync(cdc);
waitForSize(half, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
checkMetrics(cdc, half);
waitForCondition(halfCommitted::get, getTestTimeout());
fut.cancel();
assertTrue(cnsmr.stopped());
removeData(cache, 0, KEYS_CNT);
consumeHalf.set(false);
cdc = createCdc(cnsmr, getConfiguration(ign.name()));
fut = runAsync(cdc);
waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, UPDATE, cnsmr);
waitForSize(KEYS_CNT, DEFAULT_CACHE_NAME, DELETE, cnsmr);
checkMetrics(cdc, KEYS_CNT * 2 - half);
fut.cancel();
assertTrue(cnsmr.stopped());
}
/** */
public static void addData(IgniteCache<Integer, User> cache, int from, int to) {
for (int i = from; i < to; i++)
cache.put(i, createUser(i));
}
/** */
private void removeData(IgniteCache<Integer, ?> cache, int from, int to) {
for (int i = from; i < to; i++)
cache.remove(i);
}
}