IGNITE-19622 Add realtime CDC buffer (#10778)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 5fb101a..eaabed6 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -93,6 +93,7 @@
import static org.apache.ignite.internal.processors.cache.mvcc.MvccCachingManager.DFLT_MVCC_TX_SIZE_CACHING_THRESHOLD;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.internal.processors.cache.persistence.cdc.CdcWorker.DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT;
import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory.DFLT_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointMarkersStorage.DFLT_IGNITE_CHECKPOINT_MAP_SNAPSHOT_THRESHOLD;
import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointWorkflow.DFLT_CHECKPOINT_PARALLEL_SORT_THRESHOLD;
@@ -1532,6 +1533,13 @@
public static final String IGNITE_UNWIND_THROTTLING_TIMEOUT = "IGNITE_UNWIND_THROTTLING_TIMEOUT";
/**
+ * Throttling timeout in millis for polling CDC buffer in realtime mode. Default is 100 ms.
+ */
+ @SystemProperty(value = "Throttling timeout in millis for polling CDC buffer in realtime mode", type = Long.class,
+ defaults = "" + DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT)
+ public static final String IGNITE_THROTTLE_POLL_CDC_BUF = "IGNITE_THROTTLE_POLL_CDC_BUF";
+
+ /**
* Threshold for throttling operations logging.
*/
@SystemProperty(value = "Threshold in seconds for throttling operations logging", type = Integer.class,
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
index 8daa97a..86ed3c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java
@@ -20,6 +20,7 @@
import java.io.Serializable;
import java.util.zip.Deflater;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcBufferConsumer;
import org.apache.ignite.internal.processors.cache.persistence.file.AsyncFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
@@ -252,6 +253,14 @@
@IgniteExperimental
private long cdcWalDirMaxSize = DFLT_CDC_WAL_DIRECTORY_MAX_SIZE;
+ /** Maximum size of CDC buffer. */
+ @IgniteExperimental
+ private long maxCdcBufSize = (long)DFLT_WAL_SEGMENTS * DFLT_WAL_SEGMENT_SIZE;
+
+ /** CDC buffer consumer. */
+ @IgniteExperimental
+ private transient CdcBufferConsumer cdcConsumer;
+
/**
* Metrics enabled flag.
* @deprecated Will be removed in upcoming releases.
@@ -1356,6 +1365,52 @@
}
/**
+ * Gets a max size(in bytes) of CDC buffer.
+ *
+ * @return max size(in bytes) of CDC buffer.
+ */
+ public long getMaxCdcBufferSize() {
+ return maxCdcBufSize;
+ }
+
+ /**
+ * Sets a max allowed size(in bytes) of CDC buffer. Set to {@code 0} to disable realtime CDC mode.
+ *
+ * @param maxCdcBufSize max size(in bytes) of CDC buffer.
+ * @return {@code this} for chaining.
+ */
+ public DataStorageConfiguration setMaxCdcBufferSize(long maxCdcBufSize) {
+ A.ensure(
+ maxCdcBufSize >= 0,
+ "maxCdcBufferSize must be greater than 0. To disable realtime mode of CDC set value to 0.");
+
+ this.maxCdcBufSize = maxCdcBufSize;
+
+ return this;
+ }
+
+ /**
+ * Gets CDC raw data consumer.
+ *
+ * @return CDC raw data consumer.
+ */
+ public CdcBufferConsumer getCdcConsumer() {
+ return cdcConsumer;
+ }
+
+ /**
+ * Sets CDC raw data consumer.
+ *
+ * @param cdcConsumer CDC raw data consumer.
+ * @return {@code this} for chaining.
+ */
+ public DataStorageConfiguration setCdcConsumer(CdcBufferConsumer cdcConsumer) {
+ this.cdcConsumer = cdcConsumer;
+
+ return this;
+ }
+
+ /**
* Gets a min allowed size(in bytes) of WAL archives.
*
* @return min size(in bytes) of WAL archive directory(greater than 0, or {@link #HALF_MAX_WAL_ARCHIVE_SIZE}).
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
new file mode 100644
index 0000000..1b105ba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBuffer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Buffer that stores WAL records before {@link CdcWorker} consumes it. Buffer is a single-producer single-consumer bounded queue.
+ * <p>
+ * TODO: Optimize the queue:
+ * 1. by space using LinkedArrayQueue. Example: http://psy-lob-saw.blogspot.com/2016/10/linked-array-queues-part-1-spsc.html.
+ * It helps to avoid using AtomicLong for #size.
+ * 2. by performance using AtomicReference#lazySet or similar for LinkedNode#next.
+ */
+public class CdcBuffer {
+ /** Maximum size of the underlying buffer, bytes. */
+ private final long maxSize;
+
+ /** Reference to last consumed linked node. */
+ private LinkedNode consumerNode;
+
+ /** Reference to last produced linked node. */
+ private volatile LinkedNode producerNode;
+
+ /** Current size of the buffer, bytes. */
+ private final AtomicLong size = new AtomicLong();
+
+ /** If {@code true} then buffer has overflowed. */
+ private volatile boolean overflowed;
+
+ /** */
+ public CdcBuffer(long maxSize) {
+ assert maxSize > 0 : maxSize;
+
+ this.maxSize = maxSize;
+
+ producerNode = consumerNode = new LinkedNode(null);
+ }
+
+ /**
+ * Offers data for the queue. Performs by the single producer thread.
+ *
+ * @param data Data to store in the buffer.
+ */
+ public boolean offer(ByteBuffer data) {
+ int bufSize = data.limit() - data.position();
+
+ if (size.addAndGet(bufSize) > maxSize) {
+ overflowed = true;
+
+ return false;
+ }
+
+ byte[] cp = new byte[bufSize];
+
+ data.get(cp, 0, bufSize);
+
+ LinkedNode newNode = new LinkedNode(ByteBuffer.wrap(cp));
+ LinkedNode oldNode = producerNode;
+
+ producerNode = newNode;
+ oldNode.next(newNode);
+
+ return true;
+ }
+
+ /**
+ * Polls data from the queue. Performs by single consumer thread.
+ *
+ * @return Polled data, or {@code null} if no data is available now.
+ */
+ public ByteBuffer poll() {
+ LinkedNode prev = consumerNode;
+
+ LinkedNode next = prev.next;
+
+ if (next != null)
+ return poll(prev, next);
+ else if (prev != producerNode) {
+ while ((next = prev.next) == null) {
+ // No-op. New reference should appear soon.
+ }
+
+ return poll(prev, next);
+ }
+
+ return null;
+ }
+
+ /** @return Current buffer size. */
+ public long size() {
+ return size.get();
+ }
+
+ /** Cleans the buffer if overflowed. Performs by the consumer thread. */
+ public void cleanIfOverflowed() {
+ if (!overflowed || consumerNode == null)
+ return;
+
+ ByteBuffer data;
+
+ do {
+ data = poll();
+ }
+ while (data != null);
+
+ consumerNode = null;
+ producerNode = null;
+
+ size.set(0);
+ }
+
+ /**
+ * @param prev Previously consumed linked node.
+ * @param next Node to consume.
+ * @return Data to consume.
+ */
+ private ByteBuffer poll(LinkedNode prev, LinkedNode next) {
+ ByteBuffer data = next.data;
+
+ prev.next = null;
+ consumerNode = next;
+
+ size.addAndGet(-(data.limit() - data.position()));
+
+ return data;
+ }
+
+ /** */
+ private static class LinkedNode {
+ /** */
+ private volatile @Nullable LinkedNode next;
+
+ /** */
+ private final ByteBuffer data;
+
+ /** */
+ LinkedNode(ByteBuffer data) {
+ this.data = data;
+ }
+
+ /** */
+ void next(LinkedNode next) {
+ this.next = next;
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
new file mode 100644
index 0000000..67cd422
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcBufferConsumer.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.nio.ByteBuffer;
+
+/** Mock for Realtime CDC buffer consumer. */
+public interface CdcBufferConsumer {
+ /**
+ * Consumes raw WAL data.
+ *
+ * @param data Raw data to consume.
+ */
+ public void consume(ByteBuffer data);
+
+ /** */
+ public void close();
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
new file mode 100644
index 0000000..92adcf2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcProcessor.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/** CDC processor responsible for collecting data changes in realtime within Ignite node. */
+public class CdcProcessor {
+ /** Buffer to store collected data. */
+ private final CdcBuffer cdcBuf;
+
+ /** CDC worker. */
+ private final CdcWorker worker;
+
+ /** Ignite log. */
+ private final IgniteLogger log;
+
+ /** Whether CDC is enabled. Disables after {@link #cdcBuf} overflows. */
+ private boolean enabled = true;
+
+ /** */
+ public CdcProcessor(GridCacheSharedContext<?, ?> cctx, IgniteLogger log) {
+ this.log = log;
+
+ DataStorageConfiguration dsCfg = cctx.gridConfig().getDataStorageConfiguration();
+
+ cdcBuf = new CdcBuffer(dsCfg.getMaxCdcBufferSize());
+ worker = new CdcWorker(cctx, log, cdcBuf, dsCfg.getCdcConsumer());
+ }
+
+ /**
+ * @param dataBuf Buffer that contains data to collect.
+ */
+ public void collect(ByteBuffer dataBuf) {
+ if (!enabled)
+ return;
+
+ if (log.isDebugEnabled())
+ log.debug("Offerring a data bucket to the CDC buffer [len=" + (dataBuf.limit() - dataBuf.position()) + ']');
+
+ if (!cdcBuf.offer(dataBuf)) {
+ enabled = false;
+
+ log.warning("CDC buffer has overflowed. Stop realtime mode of CDC.");
+
+ worker.cancel();
+ }
+ }
+
+ /** Start CDC worker. */
+ public void start() {
+ worker.restart();
+ }
+
+ /** Shutdown CDC worker. */
+ public void shutdown() throws IgniteInterruptedCheckedException {
+ worker.cancel();
+
+ U.join(worker);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
new file mode 100644
index 0000000..2765541
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/cdc/CdcWorker.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+
+/** */
+public class CdcWorker extends GridWorker {
+ /** Default throttling timeout in millis for polling CDC buffer. */
+ public static final int DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT = 100;
+
+ /** Throttling timeout in millis for polling CDC buffer. */
+ private final long cdcBufPollTimeout = Long.getLong(
+ IgniteSystemProperties.IGNITE_THROTTLE_POLL_CDC_BUF, DFLT_POLL_CDC_BUF_THROTTLING_TIMEOUT);
+
+ /** CDC buffer. */
+ private final CdcBuffer cdcBuf;
+
+ /** CDC consumer. */
+ private final CdcBufferConsumer consumer;
+
+ /** */
+ public CdcWorker(
+ GridCacheSharedContext<?, ?> cctx,
+ IgniteLogger log,
+ CdcBuffer cdcBuf,
+ CdcBufferConsumer consumer
+ ) {
+ super(cctx.igniteInstanceName(),
+ "cdc-worker%" + cctx.igniteInstanceName(),
+ log,
+ cctx.kernalContext().workersRegistry());
+
+ this.cdcBuf = cdcBuf;
+ this.consumer = consumer;
+ }
+
+ /** */
+ @Override public void body() {
+ while (!isCancelled()) {
+ updateHeartbeat();
+
+ ByteBuffer data = cdcBuf.poll();
+
+ if (data == null) {
+ LockSupport.parkNanos(cdcBufPollTimeout * 1_000_000); // millis to nanos.
+
+ continue;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Poll a data bucket from CDC buffer [len=" + (data.limit() - data.position()) + ']');
+
+ // TODO: Consumer must not block this system thread.
+ consumer.consume(data);
+ }
+
+ consumer.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void cleanup() {
+ cdcBuf.cleanIfOverflowed();
+ }
+
+ /** */
+ public void restart() {
+ isCancelled.set(false);
+
+ new IgniteThread(this).start();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index f68b366..5470e23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -87,6 +87,7 @@
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
@@ -313,6 +314,9 @@
private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> CURR_HND_UPD =
AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currHnd");
+ /** CDC processor, {@code null} if CDC is disabled. */
+ @Nullable private CdcProcessor cdcProc;
+
/**
* File archiver moves segments from work directory to archive. Locked segments may be kept not moved until release.
* For mode archive and work folders set to equal value, archiver is not created.
@@ -521,6 +525,9 @@
dispatcher.registerProperty(cdcDisabled);
});
+
+ if (dsCfg.getMaxCdcBufferSize() > 0)
+ cdcProc = new CdcProcessor(cctx, log);
}
serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVer);
@@ -581,7 +588,7 @@
segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg);
fileHandleManager = fileHandleManagerFactory.build(
- cctx, metrics, mmap, serializer, this::currentHandle
+ cctx, metrics, mmap, serializer, cdcProc, this::currentHandle
);
lockedSegmentFileInputFactory = new LockedSegmentFileInputFactory(
@@ -753,6 +760,10 @@
if (cleaner != null)
cleaner.shutdown();
+
+ // TODO: stop cdcProcessor after checkpoint (reason=node stop) finished.
+ if (cdcProc != null)
+ cdcProc.shutdown();
}
catch (IgniteInterruptedCheckedException e) {
U.error(log, "Failed to gracefully shutdown WAL components, thread was interrupted.", e);
@@ -764,7 +775,11 @@
if (log.isDebugEnabled())
log.debug("Activated file write ahead log manager [nodeId=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
- //NOOP implementation, we need to override it.
+
+ // TODO: Invoke here to avoid double initialization. CdcConsumer must consume data only after full start.
+ // Will fix in https://issues.apache.org/jira/browse/IGNITE-19637
+ if (cdcProc != null)
+ cdcProc.start();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
index 8850c20..619ff63 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerFactory.java
@@ -23,7 +23,9 @@
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.jetbrains.annotations.Nullable;
/**
* Factory of {@link FileHandleManager}.
@@ -48,6 +50,7 @@
* @param metrics Data storage metrics.
* @param mmap Using mmap.
* @param serializer Serializer.
+ * @param cdcProc CDC processor.
* @param currHandleSupplier Supplier of current handle.
* @return One of implementation of {@link FileHandleManager}.
*/
@@ -56,6 +59,7 @@
DataStorageMetricsImpl metrics,
boolean mmap,
RecordSerializer serializer,
+ @Nullable CdcProcessor cdcProc,
Supplier<FileWriteHandle> currHandleSupplier
) {
if (dsConf.getWalMode() == WALMode.FSYNC && !walFsyncWithDedicatedWorker)
@@ -63,6 +67,7 @@
cctx,
metrics,
serializer,
+ cdcProc,
currHandleSupplier,
dsConf.getWalMode(),
dsConf.getWalSegmentSize(),
@@ -75,6 +80,7 @@
metrics,
mmap,
serializer,
+ cdcProc,
currHandleSupplier,
dsConf.getWalMode(),
dsConf.getWalBufferSize(),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
index 6b08531..e0a779f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileHandleManagerImpl.java
@@ -34,6 +34,7 @@
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
@@ -41,6 +42,7 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
import static java.lang.Long.MAX_VALUE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT;
@@ -63,6 +65,9 @@
/** Wal segment sync worker. */
private final WalSegmentSyncer walSegmentSyncWorker;
+ /** CDC processor, {@code null} if CDC is disabled. */
+ private final @Nullable CdcProcessor cdcProc;
+
/** Context. */
protected final GridCacheSharedContext cctx;
@@ -109,6 +114,7 @@
DataStorageMetricsImpl metrics,
boolean mmap,
RecordSerializer serializer,
+ @Nullable CdcProcessor cdcProc,
Supplier<FileWriteHandle> currentHandleSupplier,
WALMode mode,
int walBufferSize,
@@ -126,6 +132,7 @@
this.maxWalSegmentSize = maxWalSegmentSize;
this.fsyncDelay = fsyncDelay;
walWriter = new WALWriter(log);
+ this.cdcProc = cdcProc;
if (mode != WALMode.NONE && mode != WALMode.FSYNC) {
walSegmentSyncWorker = new WalSegmentSyncer(
@@ -163,7 +170,7 @@
rbuf.init(position);
return new FileWriteHandleImpl(
- cctx, fileIO, rbuf, serializer, metrics, walWriter, position,
+ cctx, fileIO, rbuf, serializer, metrics, walWriter, cdcProc, position,
mode, mmap, true, fsyncDelay, maxWalSegmentSize
);
}
@@ -181,7 +188,7 @@
rbuf = currentHandle().buf.reset();
return new FileWriteHandleImpl(
- cctx, fileIO, rbuf, serializer, metrics, walWriter, 0,
+ cctx, fileIO, rbuf, serializer, metrics, walWriter, cdcProc, 0,
mode, mmap, false, fsyncDelay, maxWalSegmentSize
);
}
@@ -376,7 +383,17 @@
updateHeartbeat();
try {
+ int bufPos = seg.buffer().position();
+
writeBuffer(seg.position(), seg.buffer());
+
+ if (cdcProc != null) {
+ ByteBuffer cdcBuf = seg.buffer().duplicate();
+ cdcBuf.position(bufPos);
+ cdcBuf.limit(seg.buffer().limit());
+
+ cdcProc.collect(cdcBuf);
+ }
}
catch (Throwable e) {
log.error("Exception in WAL writer thread:", e);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
index 5dc2ec1..a1fe746 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -43,6 +43,7 @@
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
@@ -114,6 +115,9 @@
/** Buffer. */
protected final SegmentedRingByteBuffer buf;
+ /** Cdc Buffer, {@code null} if CDC is disabled. */
+ private final @Nullable CdcProcessor cdcProc;
+
/** */
private final WALMode mode;
@@ -157,8 +161,8 @@
*/
FileWriteHandleImpl(
GridCacheSharedContext cctx, SegmentIO fileIO, SegmentedRingByteBuffer rbuf, RecordSerializer serializer,
- DataStorageMetricsImpl metrics, FileHandleManagerImpl.WALWriter writer, long pos, WALMode mode, boolean mmap,
- boolean resume, long fsyncDelay, long maxWalSegmentSize) throws IOException {
+ DataStorageMetricsImpl metrics, FileHandleManagerImpl.WALWriter writer, CdcProcessor cdcProc,
+ long pos, WALMode mode, boolean mmap, boolean resume, long fsyncDelay, long maxWalSegmentSize) throws IOException {
super(fileIO);
assert serializer != null;
@@ -170,6 +174,7 @@
this.log = cctx.logger(FileWriteHandleImpl.class);
this.cctx = cctx;
this.walWriter = writer;
+ this.cdcProc = cdcProc;
this.serializer = serializer;
this.written = pos;
this.lastFsyncPos = pos;
@@ -413,6 +418,14 @@
fsync((MappedByteBuffer)buf.buf, off, len);
+ if (cdcProc != null) {
+ ByteBuffer cdcBuf = buf.buf.duplicate();
+ cdcBuf.position(off);
+ cdcBuf.limit(off + len);
+
+ cdcProc.collect(cdcBuf);
+ }
+
seg.release();
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
index af3957c..8b398b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileHandleManagerImpl.java
@@ -26,9 +26,11 @@
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.jetbrains.annotations.Nullable;
/**
* Implementation of {@link FileWriteHandle} for FSYNC mode.
@@ -49,6 +51,9 @@
/** */
protected final RecordSerializer serializer;
+ /** */
+ private final @Nullable CdcProcessor cdcProc;
+
/** Current handle supplier. */
private final Supplier<FileWriteHandle> currentHandleSupplier;
@@ -75,6 +80,7 @@
GridCacheSharedContext cctx,
DataStorageMetricsImpl metrics,
RecordSerializer serializer,
+ @Nullable CdcProcessor cdcProc,
Supplier<FileWriteHandle> handle,
WALMode mode,
long maxWalSegmentSize,
@@ -86,6 +92,7 @@
this.mode = mode;
this.metrics = metrics;
this.serializer = serializer;
+ this.cdcProc = cdcProc;
currentHandleSupplier = handle;
this.maxWalSegmentSize = maxWalSegmentSize;
this.fsyncDelay = fsyncDelay;
@@ -96,7 +103,7 @@
@Override public FileWriteHandle initHandle(SegmentIO fileIO, long position,
RecordSerializer serializer) throws IOException {
return new FsyncFileWriteHandle(
- cctx, fileIO, metrics, serializer, position,
+ cctx, fileIO, metrics, serializer, cdcProc, position,
mode, maxWalSegmentSize, tlbSize, fsyncDelay
);
}
@@ -105,7 +112,7 @@
@Override public FileWriteHandle nextHandle(SegmentIO fileIO,
RecordSerializer serializer) throws IOException {
return new FsyncFileWriteHandle(
- cctx, fileIO, metrics, serializer, 0,
+ cctx, fileIO, metrics, serializer, cdcProc, 0,
mode, maxWalSegmentSize, tlbSize, fsyncDelay
);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
index ca06af5..59178e4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FsyncFileWriteHandle.java
@@ -38,6 +38,7 @@
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
+import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
@@ -114,6 +115,9 @@
/** Persistence metrics tracker. */
private final DataStorageMetricsImpl metrics;
+ /** */
+ private final @Nullable CdcProcessor cdcProc;
+
/** Logger. */
protected final IgniteLogger log;
@@ -152,7 +156,7 @@
*/
FsyncFileWriteHandle(
GridCacheSharedContext cctx, SegmentIO fileIO,
- DataStorageMetricsImpl metrics, RecordSerializer serializer, long pos,
+ DataStorageMetricsImpl metrics, RecordSerializer serializer, CdcProcessor cdcProc, long pos,
WALMode mode, long maxSegmentSize, int size, long fsyncDelay) throws IOException {
super(fileIO);
assert serializer != null;
@@ -165,6 +169,7 @@
this.fsyncDelay = fsyncDelay;
this.maxSegmentSize = maxSegmentSize;
this.serializer = serializer;
+ this.cdcProc = cdcProc;
this.written = pos;
this.lastFsyncPos = pos;
@@ -780,6 +785,7 @@
// Do the write.
int size = buf.remaining();
+ int bufPos = buf.position();
assert size > 0 : size;
@@ -799,6 +805,14 @@
throw se;
}
+
+ if (cdcProc != null) {
+ ByteBuffer cdcBuf = buf.duplicate();
+ cdcBuf.position(bufPos);
+ cdcBuf.limit(buf.limit());
+
+ cdcProc.collect(cdcBuf);
+ }
}
finally {
writeComplete.signalAll();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferDedicatedWorkerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferDedicatedWorkerTest.java
new file mode 100644
index 0000000..3f48c8f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferDedicatedWorkerTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
+
+/** */
+@WithSystemProperty(key = IGNITE_WAL_FSYNC_WITH_DEDICATED_WORKER, value = "true")
+@WithSystemProperty(key = IGNITE_WAL_MMAP, value = "false")
+public class RealtimeCdcBufferDedicatedWorkerTest extends RealtimeCdcBufferTest {
+ /** Override params to test only FSYNC mode. */
+ @Parameterized.Parameters(name = "walMode={0}")
+ public static List<Object[]> params() {
+ List<Object[]> param = new ArrayList<>();
+
+ param.add(new Object[] {WALMode.FSYNC});
+
+ return param;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferSelfTest.java
new file mode 100644
index 0000000..a16a256
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferSelfTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.locks.LockSupport;
+import org.junit.Test;
+
+/** */
+public class RealtimeCdcBufferSelfTest {
+ /** */
+ @Test
+ public void testDisableBuffer() {
+ CdcBuffer buf = new CdcBuffer(10);
+
+ // Fill the buffer.
+ for (int i = 0; i < 10; i++) {
+ boolean res = buf.offer(build());
+
+ assert res;
+ assert buf.size() == i + 1;
+ }
+
+ boolean res = buf.offer(build());
+
+ assert !res;
+ }
+
+ /** */
+ @Test
+ public void testConsumeEmptyBuffer() {
+ CdcBuffer buf = new CdcBuffer(10);
+
+ for (int i = 0; i < 10; i++) {
+ ByteBuffer data = buf.poll();
+
+ assert data == null;
+ assert buf.size() == 0;
+ }
+ }
+
+ /** */
+ @Test
+ public void testConsumeFullBuffer() {
+ CdcBuffer buf = new CdcBuffer(10);
+
+ for (int i = 0; i < 10; i++)
+ buf.offer(build());
+
+ for (int i = 0; i < 10; i++) {
+ ByteBuffer data = buf.poll();
+
+ assert build().equals(data);
+ assert buf.size() == 10 - (i + 1);
+ }
+
+ boolean res = buf.offer(build());
+
+ assert res;
+ assert buf.size() == 1;
+ }
+
+ /** */
+ @Test
+ public void testConcurrentFillBuffer() throws Exception {
+ int size = 1_000_000;
+
+ CdcBuffer buf = new CdcBuffer(size);
+
+ Thread th1 = new Thread(() -> {
+ for (int i = 0; i < size; i++) {
+ buf.offer(build());
+
+ LockSupport.parkNanos(1_000);
+ }
+ });
+
+ Thread th2 = new Thread(() -> {
+ // Goal is to invoke `poll` more frequently than `offer`, to check `poll` waits offered value.
+ for (int i = 0; i < size * 1_000; i++)
+ buf.poll();
+ });
+
+ th1.start();
+ th2.start();
+
+ th1.join();
+ th2.join();
+ }
+
+ /** */
+ private ByteBuffer build() {
+ byte[] data = new byte[1];
+ data[0] = 1;
+
+ return ByteBuffer.wrap(data);
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
new file mode 100644
index 0000000..9b9ca8e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR;
+import static org.junit.Assume.assumeFalse;
+
+/** */
+@RunWith(Parameterized.class)
+public class RealtimeCdcBufferTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CONSISTENT_ID = "ID";
+
+ /** */
+ private static ListeningTestLogger lsnrLog;
+
+ /** */
+ private static CountDownLatch stopLatch;
+
+ /** */
+ private ByteBufferCdcConsumer consumer;
+
+ /** */
+ private boolean cdcEnabled;
+
+ /** */
+ private int maxCdcBufSize;
+
+ /** */
+ @Parameterized.Parameter()
+ public WALMode walMode;
+
+ /** */
+ @Parameterized.Parameters(name = "walMode={0}")
+ public static List<Object[]> params() {
+ return F.asList(
+ new Object[] { WALMode.LOG_ONLY },
+ new Object[] { WALMode.BACKGROUND },
+ new Object[] { WALMode.FSYNC }
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setMaxCdcBufferSize(maxCdcBufSize)
+ .setCdcConsumer(consumer)
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setCdcEnabled(cdcEnabled)
+ .setPersistenceEnabled(true)
+ )
+ .setWalMode(walMode)
+ );
+
+ cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ cfg.setFailureHandler(new StopNodeFailureHandler());
+
+ cfg.setGridLogger(lsnrLog);
+
+ cfg.setConsistentId(CONSISTENT_ID);
+
+ return cfg;
+ }
+
+ /** */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ lsnrLog = new ListeningTestLogger(log);
+
+ cleanPersistenceDir();
+
+ cdcEnabled = true;
+
+ consumer = new ByteBufferCdcConsumer(10 * (int)U.MB);
+
+ stopLatch = null;
+ }
+
+ /** */
+ @Test
+ public void testCdcBufferOverflow() throws Exception {
+ maxCdcBufSize = (int)U.KB;
+
+ checkCdcBufferOverflow(10 * (int)U.KB, 100, true);
+ }
+
+ /** */
+ @Test
+ public void testCdcDisabled() throws Exception {
+ cdcEnabled = false;
+
+ checkCdcBufferOverflow(10 * (int)U.KB, 100, false);
+ }
+
+ /** */
+ @Test
+ public void testCdcBufferContent() throws Exception {
+ // TODO: Looks like there is a bug in the FSYNC case: WAL misses some records.
+ assumeFalse(walMode == WALMode.FSYNC);
+
+ maxCdcBufSize = 10 * (int)U.MB;
+
+ stopLatch = new CountDownLatch(1);
+
+ checkCdcBufferOverflow((int)U.KB, 100, false);
+
+ U.awaitQuiet(stopLatch);
+
+ File walSegments = U.resolveWorkDirectory(
+ U.defaultWorkDirectory(),
+ DFLT_STORE_DIR + "/wal/" + CONSISTENT_ID, false);
+
+ WALIterator it = walIter(walSegments);
+
+ while (it.hasNext())
+ it.next();
+
+ WALPointer ptr = it.lastRead().get();
+ int length = ptr.fileOffset() + ptr.length();
+
+ File seg = Arrays.stream(walSegments.listFiles()).sorted().findFirst().get();
+
+ byte[] walSegData = Files.readAllBytes(seg.toPath());
+
+ int step = 100;
+
+ for (int off = 0; off < length; off += step) {
+ int l = off + step < length ? step : length - off;
+
+ byte[] testWalData = new byte[l];
+ byte[] testCdcData = new byte[l];
+
+ ByteBuffer buf = ByteBuffer.wrap(walSegData);
+ buf.position(off);
+ buf.get(testWalData, 0, l);
+
+ buf = ByteBuffer.wrap(consumer.buf.array());
+ buf.position(off);
+ buf.get(testCdcData, 0, l);
+
+ assertTrue(
+ "Offset " + off + "/" + length + "\n" +
+ "EXPECT " + Arrays.toString(testWalData) + "\n" +
+ "ACTUAL " + Arrays.toString(testCdcData),
+ Arrays.equals(testWalData, testCdcData));
+ }
+ }
+
+ /** */
+ private void checkCdcBufferOverflow(int entrySize, int entryCnt, boolean shouldOverflow) throws Exception {
+ LogListener lsnr = LogListener.matches("CDC buffer has overflowed. Stop realtime mode of CDC.")
+ .times(shouldOverflow ? 1 : 0)
+ .build();
+
+ lsnrLog.registerListener(lsnr);
+
+ IgniteEx crd = startGrid(0);
+
+ crd.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Integer, byte[]> cache = crd.cache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < entryCnt; i++) {
+ byte[] data = new byte[entrySize];
+
+ Arrays.fill(data, (byte)1);
+
+ cache.put(i, data);
+ }
+
+ forceCheckpoint(crd);
+
+ stopGrid(0);
+
+ assertTrue(lsnr.check());
+ }
+
+ /** Get iterator over WAL. */
+ private WALIterator walIter(File walSegments) throws Exception {
+ IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
+
+ IgniteWalIteratorFactory.IteratorParametersBuilder params = new IgniteWalIteratorFactory.IteratorParametersBuilder()
+ .filesOrDirs(walSegments);
+
+ return factory.iterator(params);
+ }
+
+ /** */
+ private static class ByteBufferCdcConsumer implements CdcBufferConsumer {
+ /** */
+ private final ByteBuffer buf;
+
+ /** */
+ ByteBufferCdcConsumer(int maxCdcBufSize) {
+ buf = ByteBuffer.allocate(maxCdcBufSize);
+
+ Arrays.fill(buf.array(), (byte)0);
+
+ buf.position(0);
+ }
+
+ /** */
+ @Override public void consume(ByteBuffer data) {
+ buf.put(data);
+ }
+
+ /** */
+ @Override public void close() {
+ if (stopLatch != null)
+ stopLatch.countDown();
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferWalMmapDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferWalMmapDisabledTest.java
new file mode 100644
index 0000000..01247fe
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/cdc/RealtimeCdcBufferWalMmapDisabledTest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.cdc;
+
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
+
+/** */
+@WithSystemProperty(key = IGNITE_WAL_MMAP, value = "false")
+public class RealtimeCdcBufferWalMmapDisabledTest extends RealtimeCdcBufferTest {
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 8b39f3e..746a336 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -39,6 +39,10 @@
import org.apache.ignite.internal.processors.cache.persistence.IgniteRebalanceScheduleResendPartitionsTest;
import org.apache.ignite.internal.processors.cache.persistence.WALPreloadingWithCompactionTest;
import org.apache.ignite.internal.processors.cache.persistence.WalPreloadingConcurrentTest;
+import org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferDedicatedWorkerTest;
+import org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferSelfTest;
+import org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferTest;
+import org.apache.ignite.internal.processors.cache.persistence.cdc.RealtimeCdcBufferWalMmapDisabledTest;
import org.apache.ignite.internal.processors.cache.persistence.db.FullHistRebalanceOnClientStopTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
@@ -159,6 +163,12 @@
GridTestUtils.addTestIfNeeded(suite, CdcCacheConfigOnRestartTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CdcNonDefaultWorkDirTest.class, ignoredTests);
+ // Realtime CDC tests.
+ GridTestUtils.addTestIfNeeded(suite, RealtimeCdcBufferSelfTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, RealtimeCdcBufferTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, RealtimeCdcBufferDedicatedWorkerTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, RealtimeCdcBufferWalMmapDisabledTest.class, ignoredTests);
+
// new style folders with generated consistent ID test
GridTestUtils.addTestIfNeeded(suite, IgniteUidAsConsistentIdMigrationTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, IgniteWalSerializerVersionTest.class, ignoredTests);