blob: 3afcc5ce7543041471f895208aa20929f20a48a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.cdc;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
import static org.junit.Assume.assumeFalse;
/** */
@RunWith(Parameterized.class)
public class RealtimeCdcBufferTest extends GridCommonAbstractTest {
/** */
private static final String CONSISTENT_ID = "ID";
/** */
private static ListeningTestLogger lsnrLog;
/** */
private static CountDownLatch stopLatch;
/** */
private ByteBufferCdcConsumer consumer;
/** */
private boolean cdcEnabled;
/** */
private int maxCdcBufSize;
/** */
private AtomicInteger commitCnt;
/** */
@Parameterized.Parameter()
public WALMode walMode;
/** */
@Parameterized.Parameters(name = "walMode={0}")
public static List<Object[]> params() {
return F.asList(
new Object[] { WALMode.LOG_ONLY },
new Object[] { WALMode.BACKGROUND },
new Object[] { WALMode.FSYNC }
);
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(instanceName);
consumer = new ByteBufferCdcConsumer(commitCnt);
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setMaxCdcBufferSize(maxCdcBufSize)
.setCdcConsumer(consumer)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setCdcEnabled(cdcEnabled)
.setPersistenceEnabled(true) // TODO: test for in-memory mode.
)
.setWalMode(walMode)
);
cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
cfg.setFailureHandler(new StopNodeFailureHandler());
cfg.setGridLogger(lsnrLog);
cfg.setConsistentId(CONSISTENT_ID);
return cfg;
}
/** */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
lsnrLog = new ListeningTestLogger(log);
cleanPersistenceDir();
cdcEnabled = true;
stopLatch = null;
commitCnt = null;
consumer = null;
}
/** */
@Test
public void testCdcBufferOverflow() throws Exception {
// TODO: Looks like there is a bug in the FSYNC case: WAL misses some records (HEADER_RECORD).
assumeFalse(walMode == WALMode.FSYNC);
maxCdcBufSize = 100 * (int)U.KB;
checkCdcBufferOverflow(buildEntries(100, 10 * (int)U.KB), true);
}
/** */
@Test
public void testCdcDisabled() throws Exception {
cdcEnabled = false;
checkCdcBufferOverflow(buildEntries(100, 10 * (int)U.KB), false);
}
/** */
@Test
public void testCdcRecords() throws Exception {
// TODO: Looks like there is a bug in the FSYNC case: WAL misses some records (HEADER_RECORD).
assumeFalse(walMode == WALMode.FSYNC);
maxCdcBufSize = 100 * (int)U.MB;
commitCnt = new AtomicInteger();
IgniteEx crd = startGrid(0);
crd.cluster().state(ClusterState.ACTIVE);
IgniteCache<Integer, Integer> cache = crd.cache(DEFAULT_CACHE_NAME);
// Await while cluster is fully activated.
cache.put(0, 0);
int expCommitCnt = 5;
commitCnt.set(expCommitCnt);
while (commitCnt.get() >= 0)
cache.put(0, 0);
forceCheckpoint(crd);
stopGrid(0);
try (WALIterator walIt = walIter(null)) {
int cdcRecCnt = 0;
while (walIt.hasNext()) {
if (walIt.next().getValue().type() == WALRecord.RecordType.REALTIME_CDC_RECORD)
cdcRecCnt++;
}
assertEquals(expCommitCnt, cdcRecCnt);
}
}
/** */
@Test
public void testCdcBufferContent() throws Exception {
// TODO: Looks like there is a bug in the FSYNC case: WAL misses some records (HEADER_RECORD).
assumeFalse(walMode == WALMode.FSYNC);
maxCdcBufSize = 10 * (int)U.MB;
int entriesCnt = 1;
List<T2<Integer, byte[]>> expEntries = buildEntries((int)U.KB, entriesCnt);
checkCdcBufferOverflow(expEntries, false);
assertEquals(entriesCnt, expEntries.size());
List<T2<Integer, byte[]>> cdcEntries = consumer.buf.stream()
.map(e -> new T2<>((int)e.key(), (byte[])e.value()))
.collect(Collectors.toList());
assertEquals(expEntries.size(), cdcEntries.size());
for (int i = 0; i < expEntries.size(); i++) {
assertEquals(expEntries.get(i).getKey(), cdcEntries.get(i).getKey());
assertTrue(Arrays.equals(expEntries.get(i).getValue(), cdcEntries.get(i).getValue()));
}
}
/** */
private List<T2<Integer, byte[]>> buildEntries(int size, int cnt) {
List<T2<Integer, byte[]>> entries = new ArrayList<>();
for (int i = 0; i < cnt; i++) {
byte[] data = new byte[size];
ThreadLocalRandom.current().nextBytes(data);
entries.add(new T2<>(i, data));
}
return entries;
}
/** */
private void checkCdcBufferOverflow(List<T2<Integer, byte[]>> entries, boolean shouldOverflow) throws Exception {
stopLatch = cdcEnabled ? new CountDownLatch(entries.size()) : null;
LogListener lsnr = LogListener.matches("CDC buffer has overflowed. Stop realtime mode of CDC.")
.times(shouldOverflow ? 1 : 0)
.build();
lsnrLog.registerListener(lsnr);
IgniteEx crd = startGrid(0);
crd.cluster().state(ClusterState.ACTIVE);
IgniteCache<Integer, byte[]> cache = crd.cache(DEFAULT_CACHE_NAME);
for (T2<Integer, byte[]> e: entries)
cache.put(e.getKey(), e.getValue());
forceCheckpoint(crd);
if (stopLatch != null && !shouldOverflow)
U.awaitQuiet(stopLatch);
stopGrid(0);
assertTrue(lsnr.check());
try (WALIterator walIt = walIter((recType, recPtr) -> recType == WALRecord.RecordType.REALTIME_STOP_CDC_RECORD)) {
AtomicInteger stopRecCnt = new AtomicInteger();
walIt.forEach((rec) -> stopRecCnt.incrementAndGet());
assertEquals(shouldOverflow ? 1 : 0, stopRecCnt.get());
}
}
/** */
private File walSegments() throws IgniteCheckedException {
return U.resolveWorkDirectory(
U.defaultWorkDirectory(),
DFLT_STORE_DIR + "/wal/" + CONSISTENT_ID, false);
}
/** Get iterator over WAL. */
private WALIterator walIter(@Nullable IgniteBiPredicate<WALRecord.RecordType, WALPointer> filter) throws Exception {
IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
IgniteWalIteratorFactory.IteratorParametersBuilder params = new IgniteWalIteratorFactory.IteratorParametersBuilder()
.filesOrDirs(walSegments())
.filter(filter);
return factory.iterator(params);
}
/** */
private static class ByteBufferCdcConsumer implements CdcBufferConsumer {
/** */
private final List<CdcEvent> buf = new ArrayList<>();
/** */
private final AtomicInteger commitCnt;
/** */
ByteBufferCdcConsumer(@Nullable AtomicInteger commitCnt) {
this.commitCnt = commitCnt;
}
/** {@inheritDoc} */
@Override public boolean consume(Collection<CdcEvent> data) {
buf.addAll(data);
if (stopLatch != null) {
for (int i = 0; i < data.size(); i++)
stopLatch.countDown();
}
return commitCnt != null && commitCnt.decrementAndGet() >= 0;
}
/** */
@Override public void close() {
for (int i = 0; i < stopLatch.getCount(); i++)
stopLatch.countDown();
}
}
}