IGNITE-17700 Introduce CdcManager (#11044)

Co-authored-by: Nikolay Izhikov <nizhikov@apache.org>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
index 379c2bb..942fd24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
@@ -71,6 +71,15 @@
     /** */
     public static final String CACHES_STATE_FILE_NAME = "cdc-caches-state" + FILE_SUFFIX;
 
+    /**
+     * The file stores state of CDC mode. Content of the file is a {@link CdcMode} value:
+     * <ul>
+     *     <li>{@link CdcMode#CDC_UTILITY_ACTIVE} means that {@link CdcMain} utility captures data.</li>
+     *     <li>{@link CdcMode#IGNITE_NODE_ACTIVE} means that {@link CdcManager} captures data within Ignite node.</li>
+     * </ul>
+     */
+    public static final String CDC_MODE_FILE_NAME = "cdc-mode" + FILE_SUFFIX;
+
     /** Log. */
     private final IgniteLogger log;
 
@@ -98,6 +107,12 @@
     /** Mappings types state file. */
     private final Path tmpCaches;
 
+    /** CDC manager mode state file. */
+    private final Path cdcMode;
+
+    /** Temp CDC manager mode state file. */
+    private final Path tmpCdcMode;
+
     /**
      * @param stateDir State directory.
      */
@@ -111,6 +126,8 @@
         tmpMappings = stateDir.resolve(MAPPINGS_STATE_FILE_NAME + TMP_SUFFIX);
         caches = stateDir.resolve(CACHES_STATE_FILE_NAME);
         tmpCaches = stateDir.resolve(CACHES_STATE_FILE_NAME + TMP_SUFFIX);
+        cdcMode = stateDir.resolve(CDC_MODE_FILE_NAME);
+        tmpCdcMode = stateDir.resolve(CDC_MODE_FILE_NAME + TMP_SUFFIX);
     }
 
     /**
@@ -278,4 +295,26 @@
             throw new RuntimeException(e);
         }
     }
+
+    /**
+     * Loads CDC mode state from file.
+     *
+     * @return CDC mode state.
+     */
+    public CdcMode loadCdcMode() {
+        CdcMode state = load(cdcMode, () -> CdcMode.IGNITE_NODE_ACTIVE);
+
+        log.info("CDC mode loaded [" + state + ']');
+
+        return state;
+    }
+
+    /**
+     * Saves CDC mode state to file.
+     *
+     * @param mode CDC mode.
+     */
+    public void saveCdcMode(CdcMode mode) throws IOException {
+        save(mode, tmpCdcMode, cdcMode);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 84dc0a0..cbf44b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -50,6 +50,9 @@
 import org.apache.ignite.internal.MarshallerContextImpl;
 import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.cdc.WalRecordsConsumer.DataEntryIterator;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
@@ -66,6 +69,8 @@
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.platform.PlatformType;
 import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
@@ -79,6 +84,8 @@
 import static org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer;
 import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_STOP_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
 import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex;
@@ -159,6 +166,17 @@
     /** Cdc directory metric name. */
     public static final String CDC_DIR = "CdcDir";
 
+    /** Cdc mode metric name. */
+    public static final String CDC_MODE = "CdcMode";
+
+    /** Filter for consumption in {@link CdcMode#IGNITE_NODE_ACTIVE} mode. */
+    private static final IgniteBiPredicate<WALRecord.RecordType, WALPointer> PASSIVE_RECS =
+        (type, ptr) -> type == CDC_MANAGER_STOP_RECORD || type == CDC_MANAGER_RECORD;
+
+    /** Filter for consumption in {@link CdcMode#CDC_UTILITY_ACTIVE} mode. */
+    private static final IgniteBiPredicate<WALRecord.RecordType, WALPointer> ACTIVE_RECS =
+        (type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD;
+
     /** Ignite configuration. */
     private final IgniteConfiguration igniteCfg;
 
@@ -210,7 +228,13 @@
     /** Change Data Capture state. */
     private CdcConsumerState state;
 
-    /** Save state to start from. */
+    /**
+     * Saved state to start from. Points to the last committed offset. Set to {@code null} after failover on start and
+     * switching from {@link CdcMode#IGNITE_NODE_ACTIVE} to {@link CdcMode#CDC_UTILITY_ACTIVE}.
+     *
+     * @see #removeProcessedOnFailover(Path)
+     * @see #consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersBuilder)
+     */
     private T2<WALPointer, Integer> walState;
 
     /** Types state. */
@@ -222,6 +246,9 @@
     /** Caches state. */
     private Map<Integer, Long> cachesState;
 
+    /** CDC mode state. */
+    private volatile CdcMode cdcModeState;
+
     /** Stopped flag. */
     private volatile boolean started;
 
@@ -312,6 +339,7 @@
                 typesState = state.loadTypesState();
                 mappingsState = state.loadMappingsState();
                 cachesState = state.loadCaches();
+                cdcModeState = state.loadCdcMode();
 
                 if (walState != null) {
                     committedSegmentIdx.value(walState.get1().index());
@@ -395,6 +423,7 @@
         lastSegmentConsumptionTs =
             mreg.longMetric(LAST_SEG_CONSUMPTION_TIME, "Last time of consumption of WAL segment");
         metaUpdate = mreg.histogram(META_UPDATE, new long[] {100, 500, 1000}, "Metadata update time");
+        mreg.register(CDC_MODE, () -> cdcModeState.name(), String.class, "CDC mode");
     }
 
     /**
@@ -444,7 +473,7 @@
                 try (Stream<Path> cdcFiles = Files.list(cdcDir)) {
                     Set<Path> exists = new HashSet<>();
 
-                    cdcFiles
+                    Iterator<Path> segments = cdcFiles
                         .peek(exists::add) // Store files that exists in cdc dir.
                         // Need unseen WAL segments only.
                         .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p))
@@ -459,8 +488,26 @@
                             }
 
                             lastSgmnt.set(nextSgmnt);
-                        })
-                        .forEach(this::consumeSegment); // Consuming segments.
+                        }).iterator();
+
+                    while (segments.hasNext()) {
+                        Path segment = segments.next();
+
+                        if (walState != null && removeProcessedOnFailover(segment))
+                            continue;
+
+                        if (consumeSegment(segment)) {
+                            // CDC mode switched. Reset partitions info to handle them again actively.
+                            seen.clear();
+                            lastSgmnt.set(-1);
+
+                            walState = state.loadWalState();
+
+                            break;
+                        }
+
+                        walState = null;
+                    }
 
                     seen.removeIf(p -> !exists.contains(p)); // Clean up seen set.
 
@@ -477,8 +524,12 @@
         }
     }
 
-    /** Reads all available records from segment. */
-    private void consumeSegment(Path segment) {
+    /**
+     * Reads all available records from segment.
+     *
+     * @return {@code true} if mode switched.
+     */
+    private boolean consumeSegment(Path segment) {
         updateMetadata();
 
         if (log.isInfoEnabled())
@@ -491,94 +542,95 @@
                 .marshallerMappingFileStoreDir(marshaller)
                 .igniteConfigurationModifier((cfg) -> cfg.setPluginProviders(igniteCfg.getPluginProviders()))
                 .keepBinary(cdcCfg.isKeepBinary())
-                .filesOrDirs(segment.toFile())
-                .addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD);
+                .filesOrDirs(segment.toFile());
 
         if (igniteCfg.getDataStorageConfiguration().getPageSize() != 0)
             builder.pageSize(igniteCfg.getDataStorageConfiguration().getPageSize());
 
+        if (walState != null)
+            builder.from(walState.get1());
+
         long segmentIdx = segmentIndex(segment);
 
         lastSegmentConsumptionTs.value(System.currentTimeMillis());
 
         curSegmentIdx.value(segmentIdx);
 
-        if (walState != null) {
-            if (segmentIdx > walState.get1().index()) {
-                throw new IgniteException("Found segment greater then saved state. Some events are missed. Exiting! " +
-                    "[state=" + walState + ", segment=" + segmentIdx + ']');
-            }
-
-            if (segmentIdx < walState.get1().index()) {
-                if (log.isInfoEnabled()) {
-                    log.info("Already processed segment found. Skipping and deleting the file [segment=" +
-                        segmentIdx + ", state=" + walState.get1().index() + ']');
-                }
-
-                // WAL segment is a hard link to a segment file in the special Change Data Capture folder.
-                // So, we can safely delete it after processing.
-                try {
-                    Files.delete(segment);
-
-                    return;
-                }
-                catch (IOException e) {
-                    throw new IgniteException(e);
-                }
-            }
-
-            builder.from(walState.get1());
+        if (cdcModeState == CdcMode.IGNITE_NODE_ACTIVE) {
+            if (consumeSegmentPassively(builder))
+                return true;
         }
+        else
+            consumeSegmentActively(builder);
 
-        try (DataEntryIterator iter = new DataEntryIterator(new IgniteWalIteratorFactory(log).iterator(builder))) {
-            if (walState != null) {
+        processedSegments.add(segment);
+
+        return false;
+    }
+
+    /**
+     * Consumes CDC events in {@link CdcMode#CDC_UTILITY_ACTIVE} mode.
+     */
+    private void consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersBuilder builder) {
+        try (DataEntryIterator iter = new DataEntryIterator(new IgniteWalIteratorFactory(log).iterator(builder.addFilter(ACTIVE_RECS)))) {
+            if (walState != null)
                 iter.init(walState.get2());
 
-                walState = null;
-            }
-
-            boolean interrupted = false;
+            boolean interrupted;
 
             do {
                 boolean commit = consumer.onRecords(iter);
 
-                if (commit) {
-                    T2<WALPointer, Integer> curState = iter.state();
-
-                    if (curState == null)
-                        continue;
-
-                    if (log.isDebugEnabled())
-                        log.debug("Saving state [curState=" + curState + ']');
-
-                    state.saveWal(curState);
-
-                    committedSegmentIdx.value(curState.get1().index());
-                    committedSegmentOffset.value(curState.get1().fileOffset());
-
-                    // Can delete after new file state save.
-                    if (!processedSegments.isEmpty()) {
-                        // WAL segment is a hard link to a segment file in a specifal Change Data Capture folder.
-                        // So we can safely delete it after success processing.
-                        for (Path processedSegment : processedSegments) {
-                            // Can't delete current segment, because state points to it.
-                            if (processedSegment.equals(segment))
-                                continue;
-
-                            Files.delete(processedSegment);
-                        }
-
-                        processedSegments.clear();
-                    }
-                }
+                if (commit)
+                    saveStateAndRemoveProcessed(iter.state());
 
                 interrupted = Thread.interrupted();
             } while (iter.hasNext() && !interrupted);
 
             if (interrupted)
                 throw new IgniteException("Change Data Capture Application interrupted");
+        }
+        catch (IgniteCheckedException | IOException e) {
+            throw new IgniteException(e);
+        }
+    }
 
-            processedSegments.add(segment);
+    /**
+     * Consumes CDC events in {@link CdcMode#IGNITE_NODE_ACTIVE} mode.
+     *
+     * @return {@code true} if mode switched.
+     */
+    private boolean consumeSegmentPassively(IgniteWalIteratorFactory.IteratorParametersBuilder builder) {
+        try (WALIterator iter = new IgniteWalIteratorFactory(log).iterator(builder.addFilter(PASSIVE_RECS))) {
+            boolean interrupted = false;
+
+            while (iter.hasNext() && !interrupted) {
+                IgniteBiTuple<WALPointer, WALRecord> next = iter.next();
+
+                WALRecord walRecord = next.get2();
+
+                switch (walRecord.type()) {
+                    case CDC_MANAGER_RECORD:
+                        saveStateAndRemoveProcessed(((CdcManagerRecord)walRecord).walState());
+
+                        break;
+
+                    case CDC_MANAGER_STOP_RECORD:
+                        state.saveCdcMode((cdcModeState = CdcMode.CDC_UTILITY_ACTIVE));
+
+                        return true;
+
+                    default:
+                        throw new IgniteException("Unexpected record [type=" + walRecord.type() + ']');
+                }
+
+                interrupted = Thread.interrupted();
+            }
+
+            if (interrupted)
+                throw new IgniteException("Change Data Capture Application interrupted");
+
+            return false;
         }
         catch (IgniteCheckedException | IOException e) {
             throw new IgniteException(e);
@@ -727,6 +779,72 @@
     }
 
     /**
+     * Remove segment file if it already processed. {@link #walState} points to the last committed offset so all files
+     * before this offset can be removed.
+     *
+     * @param segment Segment to check.
+     * @return {@code True} if segment file was deleted, {@code false} otherwise.
+     */
+    private boolean removeProcessedOnFailover(Path segment) {
+        long segmentIdx = segmentIndex(segment);
+
+        if (segmentIdx > walState.get1().index()) {
+            throw new IgniteException("Found segment greater then saved state. Some events are missed. Exiting! " +
+                "[state=" + walState + ", segment=" + segmentIdx + ']');
+        }
+
+        if (segmentIdx < walState.get1().index()) {
+            if (log.isInfoEnabled()) {
+                log.info("Already processed segment found. Skipping and deleting the file [segment=" +
+                    segmentIdx + ", state=" + walState.get1().index() + ']');
+            }
+
+            // WAL segment is a hard link to a segment file in the special Change Data Capture folder.
+            // So, we can safely delete it after processing.
+            try {
+                Files.delete(segment);
+
+                return true;
+            }
+            catch (IOException e) {
+                throw new IgniteException(e);
+            }
+        }
+
+        return false;
+    }
+
+    /** Saves WAL consumption state and delete segments that no longer required. */
+    private void saveStateAndRemoveProcessed(T2<WALPointer, Integer> curState) throws IOException {
+        if (curState == null)
+            return;
+
+        if (log.isDebugEnabled())
+            log.debug("Saving state [curState=" + curState + ']');
+
+        state.saveWal(curState);
+
+        committedSegmentIdx.value(curState.get1().index());
+        committedSegmentOffset.value(curState.get1().fileOffset());
+
+        Iterator<Path> rmvIter = processedSegments.iterator();
+
+        while (rmvIter.hasNext()) {
+            Path processedSegment = rmvIter.next();
+
+            // Can't delete current segment, because state points to it.
+            if (segmentIndex(processedSegment) >= curState.get1().index())
+                continue;
+
+            // WAL segment is a hard link to a segment file in a specifal Change Data Capture folder.
+            // So we can safely delete it after success processing.
+            Files.delete(processedSegment);
+
+            rmvIter.remove();
+        }
+    }
+
+    /**
      * Try locks Change Data Capture directory.
      *
      * @param dbStoreDirWithSubdirectory Root PDS directory.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcManager.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcManager.java
new file mode 100644
index 0000000..af9bed7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcManager.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.lang.IgniteExperimental;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+
+/**
+ * CDC manager responsible for logic of capturing data within Ignite node. Communication between {@link CdcManager} and
+ * {@link CdcMain} components is established through CDC management WAL records. {@link CdcManager} logs them, while {@link CdcMain}
+ * listens to them:
+ * <ul>
+ *     <li>
+ *         This manager can log {@link CdcManagerRecord} with committed consumer state. This record will be handled by
+ *         {@link CdcMain} the same way it handles {@link CdcConsumer#onEvents(Iterator)} when it returns {@code true}.
+ *     </li>
+ *     <li>
+ *         This manager must log {@link CdcManagerStopRecord} after it stopped handling records by any reason.
+ *         {@link CdcMain} will start consume records instead since {@link CdcManagerRecord#walState()} of
+ *         last consumed {@link CdcManagerRecord}.
+ *     </li>
+ *     <li>
+ *         Apache Ignite provides a default implementation - {@link CdcUtilityActiveCdcManager}. It is disabled from the
+ *         beginning, logs the {@link CdcManagerStopRecord} after Ignite node started, and then delegates consuming CDC
+ *         events to the {@link CdcMain} utility.
+ *     </li>
+ * </ul>
+ *
+ * Apache Ignite can be extended with custom CDC manager. It's required to implement this interface, create own {@link PluginProvider}
+ * that will return the implementation with {@link PluginProvider#createComponent(PluginContext, Class)} method.
+ * The {@code Class} parameter in the method is {@code CdcManager} class.
+ *
+ * @see CdcConsumer#onEvents(Iterator)
+ * @see CdcConsumerState#saveCdcMode(CdcMode)
+ */
+@IgniteExperimental
+public interface CdcManager extends GridCacheSharedManager, DatabaseLifecycleListener {
+    /**
+     * If this manager isn't enabled then Ignite skips notifying the manager with following methods.
+     *
+     * @return {@code true} if manager is enabled, otherwise {@code false}.
+     */
+    public boolean enabled();
+
+    /**
+     * Callback is executed only once on Ignite node start when binary memory has fully restored and WAL logging is resumed.
+     * It's executed before the first call of {@link #collect(ByteBuffer)}.
+     * <p> Implementation suggestions:
+     * <ul>
+     *     <li>
+     *         Implementation must subscribe to the database events to get notified with this callback via
+     *         {@link GridInternalSubscriptionProcessor#registerDatabaseListener(DatabaseLifecycleListener)}
+     *     </li>
+     *     <li>Callback can be used for restoring CDC state on Ignite node start, collecting missed events from WAL segments.</li>
+     *     <li>Be aware, this method runs in the Ignite system thread and might get longer the Ignite start procedure.</li>
+     *     <li>Ignite node will fail in case the method throws an exception.</li>
+     * </ul>
+     */
+    @Override public default void afterBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr,
+        GridCacheDatabaseSharedManager.RestoreBinaryState restoreState) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /**
+     * Callback to collect written WAL records. The provided buffer is a continuous part of WAL segment file.
+     * The buffer might contain full content of a segment or only piece of it. There are guarantees:
+     * <ul>
+     *     <li>This method is invoked sequentially.</li>
+     *     <li>Provided {@code dataBuf} is a continuation of the previous one.</li>
+     *     <li>{@code dataBuf} contains finite number of completed WAL records. No partially written WAL records are present.</li>
+     *     <li>Records can be read from the buffer with {@link RecordSerializer#readRecord(FileInput, WALPointer)}.</li>
+     *     <li>{@code dataBuf} must not be changed within this method.</li>
+     * </ul>
+     *
+     * <p> Implementation suggestions:
+     * <ul>
+     *     <li>
+     *         Frequence of calling the method depends on frequence of fsyncing WAL segment.
+     *         See {@link IgniteSystemProperties#IGNITE_WAL_SEGMENT_SYNC_TIMEOUT}.
+     *     </li>
+     *     <li>
+     *         It must handle the content of the {@code dataBuf} within the calling thread.
+     *         Content of the buffer will not be changed before this method returns.
+     *     </li>
+     *     <li>It must not block the calling thread and work quickly.</li>
+     *     <li>Ignite will ignore any {@link Throwable} throwed from this method.</li>
+     * </ul>
+     *
+     * @param dataBuf Buffer that contains data to collect.
+     */
+    public void collect(ByteBuffer dataBuf);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMode.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMode.java
new file mode 100644
index 0000000..ff4e186
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMode.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+/** CDC modes. */
+public enum CdcMode {
+    /** CDC events are handled by {@link CdcManager} within Ignite node. */
+    IGNITE_NODE_ACTIVE,
+
+    /** CDC events are handled by {@link CdcMain} utility. */
+    CDC_UTILITY_ACTIVE
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtilityActiveCdcManager.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtilityActiveCdcManager.java
new file mode 100644
index 0000000..02a4d55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtilityActiveCdcManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.cdc.CdcConsumerState.CDC_MODE_FILE_NAME;
+import static org.apache.ignite.internal.cdc.CdcMain.STATE_DIR;
+
+/**
+ * CDC manager that delegates consuming CDC events to the {@link CdcMain} utility.
+ */
+public class CdcUtilityActiveCdcManager extends GridCacheSharedManagerAdapter implements CdcManager, PartitionsExchangeAware {
+    /** */
+    @Override protected void start0() {
+        cctx.exchange().registerExchangeAwareComponent(this);
+    }
+
+    /** */
+    @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+        if (fut.localJoinExchange() || fut.activateCluster()) {
+            try {
+                File cdcModeFile = Paths.get(
+                    ((FileWriteAheadLogManager)cctx.wal(true)).walCdcDirectory().getAbsolutePath(),
+                    STATE_DIR,
+                    CDC_MODE_FILE_NAME).toFile();
+
+                if (!cdcModeFile.exists())
+                    cctx.wal(true).log(new CdcManagerStopRecord());
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to activate CdcManager. CDC might not work.");
+            }
+            finally {
+                cctx.exchange().unregisterExchangeAwareComponent(this);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean enabled() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collect(ByteBuffer dataBuf) {
+        // No-op.
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerRecord.java
new file mode 100644
index 0000000..b4e717c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerRecord.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import java.util.Iterator;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ * This record notifies {@link CdcMain} about committed WAL state. It's assumed that WAL state will be inferred from
+ * {@link CdcConsumer} committed WALPointer.
+ *
+ * @see CdcConsumer#onEvents(Iterator)
+ */
+public class CdcManagerRecord extends WALRecord {
+    /** CDC consumer WAL state. */
+    private final T2<WALPointer, Integer> state;
+
+    /** */
+    public CdcManagerRecord(T2<WALPointer, Integer> state) {
+        this.state = state;
+    }
+
+    /** @return CDC consumer WAL state. */
+    public T2<WALPointer, Integer> walState() {
+        return state;
+    }
+
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.CDC_MANAGER_RECORD;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerStopRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerStopRecord.java
new file mode 100644
index 0000000..729cb93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerStopRecord.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.cdc.CdcManager;
+
+/**
+ * Record notifies {@link CdcMain} that {@link CdcManager} fails and stop processing WAL data in-memory.
+ * {@link CdcMain} must start capturing and process {@link CdcEvent} in regularly(active) way.
+ */
+public class CdcManagerStopRecord extends WALRecord {
+    /** {@inheritDoc} */
+    @Override public RecordType type() {
+        return RecordType.CDC_MANAGER_STOP_RECORD;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 3457251..ebaa726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -291,7 +291,13 @@
         INCREMENTAL_SNAPSHOT_FINISH_RECORD(77, LOGICAL),
 
         /** CDC data record. */
-        CDC_DATA_RECORD(78, CUSTOM);
+        CDC_DATA_RECORD(78, CUSTOM),
+
+        /** CDC manager record. */
+        CDC_MANAGER_RECORD(79, CUSTOM),
+
+        /** CDC manager record. */
+        CDC_MANAGER_STOP_RECORD(80, CUSTOM);
 
         /** Index for serialization. Should be consistent throughout all versions. */
         private final int idx;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 41871d0..ce42781 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -67,6 +67,7 @@
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.DeploymentMode;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.configuration.WarmUpConfiguration;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
@@ -78,6 +79,8 @@
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.cdc.CdcManager;
+import org.apache.ignite.internal.cdc.CdcUtilityActiveCdcManager;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.DetachedClusterNode;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
@@ -204,6 +207,7 @@
 import static java.util.stream.Collectors.toSet;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -3034,6 +3038,7 @@
         IgniteCacheDatabaseSharedManager dbMgr;
         IgnitePageStoreManager pageStoreMgr = null;
         IgniteWriteAheadLogManager walMgr = null;
+        CdcManager cdcMgr = null;
 
         if (CU.isPersistenceEnabled(ctx.config()) && !ctx.clientNode()) {
             dbMgr = new GridCacheDatabaseSharedManager(ctx);
@@ -3059,6 +3064,27 @@
                 walMgr = new FileWriteAheadLogManager(ctx);
         }
 
+        if (CU.isCdcEnabled(ctx.config()) && !ctx.clientNode()) {
+            cdcMgr = ctx.plugins().createComponent(CdcManager.class);
+
+            if (cdcMgr != null) {
+                if (ctx.config().getDataStorageConfiguration().getWalMode() != WALMode.LOG_ONLY) {
+                    U.warn(log, "Custom CdcManager is only supported for WALMode.LOG_ONLY. CdcManager configuration will be ignored.");
+
+                    cdcMgr = null;
+                }
+
+                if (!IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, true)) {
+                    U.warn(log, "Custom CdcManager is only supported for IGNITE_WAL_MMAP=true. CdcManager configuration will be ignored.");
+
+                    cdcMgr = null;
+                }
+            }
+
+            if (cdcMgr == null)
+                cdcMgr = new CdcUtilityActiveCdcManager();
+        }
+
         IgniteSnapshotManager snapshotMgr = ctx.plugins().createComponent(IgniteSnapshotManager.class);
 
         if (snapshotMgr == null)
@@ -3082,6 +3108,7 @@
             .setJtaManager(JTA.createOptional())
             .setMvccCachingManager(new MvccCachingManager())
             .setDiagnosticManager(new CacheDiagnosticManager())
+            .setCdcManager(cdcMgr)
             .build(kernalCtx, storeSesLsnrs);
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index c1b9c95..7d87015 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -36,6 +36,7 @@
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcManager;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -115,6 +116,9 @@
     /** Write ahead log manager for CDC. {@code Null} if persistence AND CDC is not enabled. */
     @Nullable private IgniteWriteAheadLogManager cdcWalMgr;
 
+    /** CDC manager. {@code null} if CDC isn't configured. */
+    @Nullable private CdcManager cdcMgr;
+
     /** Write ahead log state manager. */
     private WalStateManager walStateMgr;
 
@@ -230,7 +234,8 @@
         CacheJtaManagerAdapter jtaMgr,
         Collection<CacheStoreSessionListener> storeSesLsnrs,
         MvccCachingManager mvccCachingMgr,
-        CacheDiagnosticManager diagnosticMgr
+        CacheDiagnosticManager diagnosticMgr,
+        CdcManager cdcMgr
     ) {
         this.kernalCtx = kernalCtx;
 
@@ -253,7 +258,8 @@
             ttlMgr,
             evictMgr,
             mvccCachingMgr,
-            diagnosticMgr
+            diagnosticMgr,
+            cdcMgr
         );
 
         this.storeSesLsnrs = storeSesLsnrs;
@@ -431,7 +437,8 @@
             ttlMgr,
             evictMgr,
             mvccCachingMgr,
-            diagnosticMgr
+            diagnosticMgr,
+            cdcMgr
         );
 
         this.mgrs = mgrs;
@@ -480,7 +487,8 @@
         GridCacheSharedTtlCleanupManager ttlMgr,
         PartitionsEvictManager evictMgr,
         MvccCachingManager mvccCachingMgr,
-        CacheDiagnosticManager diagnosticMgr
+        CacheDiagnosticManager diagnosticMgr,
+        CdcManager cdcMgr
     ) {
         this.diagnosticMgr = add(mgrs, diagnosticMgr);
         this.mvccMgr = add(mgrs, mvccMgr);
@@ -492,6 +500,7 @@
         assert walMgr == null || walMgr == cdcWalMgr;
 
         this.cdcWalMgr = walMgr == null ? add(mgrs, cdcWalMgr) : cdcWalMgr;
+        this.cdcMgr = add(mgrs, cdcMgr);
         this.walStateMgr = add(mgrs, walStateMgr);
         this.dbMgr = add(mgrs, dbMgr);
         this.snapshotMgr = add(mgrs, snapshotMgr);
@@ -761,6 +770,13 @@
     }
 
     /**
+     * @return CDC manager.
+     */
+    public CdcManager cdc() {
+        return cdcMgr;
+    }
+
+    /**
      * @return WAL state manager.
      */
     public WalStateManager walState() {
@@ -1267,6 +1283,9 @@
         private CacheDiagnosticManager diagnosticMgr;
 
         /** */
+        private CdcManager cdcMgr;
+
+        /** */
         private Builder() {
             // No-op.
         }
@@ -1295,7 +1314,8 @@
                 jtaMgr,
                 storeSesLsnrs,
                 mvccCachingMgr,
-                diagnosticMgr
+                diagnosticMgr,
+                cdcMgr
             );
         }
 
@@ -1417,5 +1437,12 @@
 
             return this;
         }
+
+        /** */
+        public Builder setCdcManager(CdcManager cdcMgr) {
+            this.cdcMgr = cdcMgr;
+
+            return this;
+        }
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 445ccde..0580389 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -1069,6 +1069,10 @@
 
             cctx.wal(true).startAutoReleaseSegments();
             cctx.wal(true).resumeLogging(ptr);
+
+            // This callback is required for CdcManager initialization.
+            if (cctx.cdc() != null && cctx.cdc().enabled())
+                cctx.cdc().afterBinaryMemoryRestore(this, null);
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index cc78a3c..12c40e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -518,8 +518,12 @@
                                     name, oldVal, newVal));
                             }
 
-                            if (newVal != null && newVal)
+                            if (newVal != null && newVal) {
                                 log.warning("CDC was disabled.");
+
+                                if (cctx.cdc() != null)
+                                    cctx.cdc().stop(true);
+                            }
                         });
 
                         dispatcher.registerProperty(cdcDisabled);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
index 5dc2ec1..f242ebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -37,6 +37,7 @@
 import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.cdc.CdcManager;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -406,14 +407,7 @@
                     if (segs != null) {
                         assert segs.size() == 1;
 
-                        SegmentedRingByteBuffer.ReadSegment seg = segs.get(0);
-
-                        int off = seg.buffer().position();
-                        int len = seg.buffer().limit() - off;
-
-                        fsync((MappedByteBuffer)buf.buf, off, len);
-
-                        seg.release();
+                        fsyncReadSegment(segs.get(0), false);
                     }
                 }
                 else
@@ -499,7 +493,7 @@
                     if (segs != null) {
                         assert segs.size() == 1;
 
-                        segs.get(0).release();
+                        fsyncReadSegment(segs.get(0), true);
                     }
                 }
 
@@ -538,6 +532,36 @@
     }
 
     /**
+     * Make fsync for part of the WAL segment file. And collect it to {@link CdcManager} if enabled.
+     *
+     * @param seg Part of the WAL segment file.
+     * @param onlyCdc If {@code true} then skip actual fsync. TODO: IGNITE-20732
+     */
+    private void fsyncReadSegment(SegmentedRingByteBuffer.ReadSegment seg, boolean onlyCdc) throws IgniteCheckedException {
+        int off = seg.buffer().position();
+        int len = seg.buffer().limit() - off;
+
+        if (!onlyCdc)
+            fsync((MappedByteBuffer)buf.buf, off, len);
+
+        if (cctx.cdc() != null && cctx.cdc().enabled()) {
+            try {
+                ByteBuffer cdcBuf = buf.buf.asReadOnlyBuffer();
+                cdcBuf.position(off);
+                cdcBuf.limit(off + len);
+                cdcBuf.order(buf.buf.order());
+
+                cctx.cdc().collect(cdcBuf);
+            }
+            catch (Throwable cdcErr) {
+                U.error(log, "Error happened during CDC data collection.", cdcErr);
+            }
+        }
+
+        seg.release();
+    }
+
+    /**
      * Signals next segment available to wake up other worker threads waiting for WAL to write.
      */
     @Override public void signalNextAvailable() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 3907bf2..6363f59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -37,6 +37,8 @@
 import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.wal.record.CacheState;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
@@ -541,6 +543,7 @@
                 return 4 + 8 + 1;
 
             case SWITCH_SEGMENT_RECORD:
+            case CDC_MANAGER_STOP_RECORD:
                 return 0;
 
             case TX_RECORD:
@@ -567,6 +570,9 @@
             case INCREMENTAL_SNAPSHOT_FINISH_RECORD:
                 return ((IncrementalSnapshotFinishRecord)record).dataSize();
 
+            case CDC_MANAGER_RECORD:
+                return 8 + 4 + 4 + 4;
+
             default:
                 throw new UnsupportedOperationException("Type: " + record.type());
         }
@@ -1323,6 +1329,22 @@
 
                 break;
 
+            case CDC_MANAGER_RECORD:
+                long walSegIdx = in.readLong();
+                int fileOff = in.readInt();
+                int size = in.readInt();
+
+                int entryIdx = in.readInt();
+
+                res = new CdcManagerRecord(new T2<>(new WALPointer(walSegIdx, fileOff, size), entryIdx));
+
+                break;
+
+            case CDC_MANAGER_STOP_RECORD:
+                res = new CdcManagerStopRecord();
+
+                break;
+
             default:
                 throw new UnsupportedOperationException("Type: " + type);
         }
@@ -1912,6 +1934,7 @@
                 break;
 
             case SWITCH_SEGMENT_RECORD:
+            case CDC_MANAGER_STOP_RECORD:
                 break;
 
             case MASTER_KEY_CHANGE_RECORD_V2:
@@ -2002,6 +2025,17 @@
 
                 break;
 
+            case CDC_MANAGER_RECORD:
+                T2<WALPointer, Integer> state = ((CdcManagerRecord)rec).walState();
+
+                buf.putLong(state.get1().index());
+                buf.putInt(state.get1().fileOffset());
+                buf.putInt(state.get1().length());
+
+                buf.putInt(state.get2());
+
+                break;
+
             default:
                 throw new UnsupportedOperationException("Type: " + rec.type());
         }
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcIgniteNodeActiveModeTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcIgniteNodeActiveModeTest.java
new file mode 100644
index 0000000..6fe9dae
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcIgniteNodeActiveModeTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cdc.CdcConsumerState;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.cdc.CdcManager;
+import org.apache.ignite.internal.cdc.CdcMode;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord;
+import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.spi.metric.ObjectMetric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.wal.record.RecordUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.cdc.CdcMain.CDC_MODE;
+import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_IDX;
+import static org.apache.ignite.internal.cdc.CdcMain.CUR_SEG_IDX;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class CdcIgniteNodeActiveModeTest extends AbstractCdcTest {
+    /** */
+    private IgniteEx ign;
+
+    /** */
+    private CdcMain cdcMain;
+
+    /** */
+    private IgniteInternalFuture<?> cdcMainFut;
+
+    /** */
+    private UserCdcConsumer cnsmr;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setCdcEnabled(true)));
+
+        cfg.setPluginProviders(new NoOpCdcManagerPluginProvider());
+
+        cfg.setCacheConfiguration(new CacheConfiguration<Integer, User>(DEFAULT_CACHE_NAME));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+
+        ign = startGrid();
+
+        ign.cluster().state(ACTIVE);
+
+        cdcMain = createCdc((cnsmr = new UserCdcConsumer()), ign.configuration());
+
+        cdcMainFut = runAsync(cdcMain);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        cdcMainFut.cancel();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void testCdcMetrics() throws Exception {
+        awaitCdcModeValue(CdcMode.IGNITE_NODE_ACTIVE);
+
+        IgniteCache<Integer, User> cache = ign.cache(DEFAULT_CACHE_NAME);
+
+        for (int i = 0; i < 5_000; i++) {
+            cache.put(i, createUser(i));
+
+            if (i % 1_000 == 0) {
+                writeCdcManagerRecord();
+                rollSegment();
+            }
+        }
+
+        MetricRegistry mreg = GridTestUtils.<StandaloneGridKernalContext>getFieldValue(cdcMain, "kctx")
+            .metric().registry("cdc");
+
+        assertTrue(GridTestUtils.waitForCondition(() ->
+            mreg.<LongMetric>findMetric(CUR_SEG_IDX).value() > 1, 10_000, 100));
+
+        assertTrue(GridTestUtils.waitForCondition(() ->
+            mreg.<LongMetric>findMetric(COMMITTED_SEG_IDX).value() > 1, 10_000, 100));
+
+        awaitCdcModeValue(CdcMode.IGNITE_NODE_ACTIVE);
+    }
+
+    /** */
+    @Test
+    public void testSwitchToCdcUtilityActiveMode() throws Exception {
+        writeCdcManagerStopCdcRecord();
+
+        rollSegment();
+
+        awaitCdcModeValue(CdcMode.CDC_UTILITY_ACTIVE);
+    }
+
+    /** */
+    @Test
+    public void testSkipsNodeCommittedData() throws Exception {
+        List<Integer> expUsers = new ArrayList<>();
+
+        addData(0, 1, null);
+
+        writeCdcManagerRecord();
+
+        rollSegment();
+
+        addData(2, 3, expUsers);
+
+        rollSegment();
+
+        addData(4, 5, expUsers);
+
+        writeCdcManagerStopCdcRecord();
+
+        rollSegment();
+
+        awaitCdcModeValue(CdcMode.CDC_UTILITY_ACTIVE);
+
+        checkConsumerData(expUsers);
+
+        addData(6, 7, expUsers);
+
+        rollSegment();
+
+        checkConsumerData(expUsers);
+    }
+
+    /** */
+    @Test
+    public void testCleanWalsAfterCdcManagerRecord() throws Exception {
+        checkCdcSegmentsExists(0, -1);  // No segments stored in CDC dir.
+
+        rollSegment();
+
+        checkCdcSegmentsExists(0, 0);  // Segment 0 is stored.
+
+        rollSegment();
+
+        checkCdcSegmentsExists(0, 1);  // Segment 1 is stored.
+
+        writeCdcManagerRecord();
+        rollSegment();
+
+        checkCdcSegmentsExists(2, 2);  // Segments 0, 1 should be cleared after CdcManagerRecord.
+
+        rollSegment();
+
+        checkCdcSegmentsExists(2, 3);  // Segment 3 is stored.
+
+        writeCdcManagerRecord();
+        rollSegment();
+
+        checkCdcSegmentsExists(4, 4);  // Segments 2, 3 should be cleared after CdcManagerRecord.
+
+        writeCdcManagerStopCdcRecord();
+        rollSegment();
+
+        checkCdcSegmentsExists(5, 5); // Segment 4 is cleared after CdcMain consumes it.
+    }
+
+    /** */
+    @Test
+    public void testRestoreModeOnRestart() throws Exception {
+        RunnableX restartUtil = () -> {
+            cdcMainFut.cancel();
+            cdcMain = createCdc(cnsmr, getConfiguration(getTestIgniteInstanceName()));
+            cdcMainFut = runAsync(cdcMain);
+        };
+
+        List<Integer> expUsers = new ArrayList<>();
+
+        addData(0, 1, null);
+
+        rollSegment();
+
+        addData(2, 3, null);
+
+        restartUtil.run();
+
+        rollSegment();
+
+        writeCdcManagerRecord();
+        writeCdcManagerStopCdcRecord();
+
+        addData(4, 5, expUsers);
+
+        rollSegment();
+
+        restartUtil.run();
+
+        awaitCdcModeValue(CdcMode.CDC_UTILITY_ACTIVE);
+
+        checkConsumerData(expUsers);
+
+        addData(4, 5, expUsers);
+
+        rollSegment();
+
+        checkConsumerData(expUsers);
+    }
+
+    /** */
+    @Test
+    public void testStoreCdcMode() throws Exception {
+        writeCdcManagerStopCdcRecord();
+
+        rollSegment();
+
+        awaitCdcModeValue(CdcMode.CDC_UTILITY_ACTIVE);
+
+        CdcConsumerState state = GridTestUtils.getFieldValue(cdcMain, "state");
+
+        assertEquals(CdcMode.CDC_UTILITY_ACTIVE, state.loadCdcMode());
+    }
+
+    /** */
+    private WALPointer rollSegment() throws IgniteCheckedException {
+        return walMgr().log(RecordUtils.buildCheckpointRecord(), RolloverType.CURRENT_SEGMENT);
+    }
+
+    /** */
+    private void writeCdcManagerStopCdcRecord() throws Exception {
+        walMgr().log(new CdcManagerStopRecord());
+    }
+
+    /** */
+    private void writeCdcManagerRecord() throws Exception {
+        walMgr().log(new CdcManagerRecord(new T2<>(walMgr().log(RecordUtils.buildCheckpointRecord()), 0)));
+    }
+
+    /** */
+    private IgniteWriteAheadLogManager walMgr() {
+        return ign.context().cache().context().wal(true);
+    }
+
+    /**
+     * @param expUsers Expected users read by CDC.
+     */
+    private void checkConsumerData(List<Integer> expUsers) throws IgniteInterruptedCheckedException {
+        assertTrue(waitForCondition(() ->
+            expUsers.equals(cnsmr.data(ChangeEventType.UPDATE, cacheId(DEFAULT_CACHE_NAME))
+        ), 10_000, 10));
+    }
+
+    /**
+     * @param from First index of user to add.
+     * @param to Last index of user to add.
+     * @param expCdcReadUsers Expected CDC read users.
+     */
+    private void addData(int from, int to, List<Integer> expCdcReadUsers) {
+        IntStream.range(from, to).forEach(u -> ign.cache(DEFAULT_CACHE_NAME).put(u, createUser(u)));
+
+        if (expCdcReadUsers != null)
+            IntStream.range(from, to).forEach(expCdcReadUsers::add);
+    }
+
+    /** */
+    private void awaitCdcModeValue(CdcMode expVal) throws IgniteInterruptedCheckedException {
+        assertTrue(waitForCondition(() -> {
+            try {
+                ObjectMetric<String> m = GridTestUtils.<StandaloneGridKernalContext>getFieldValue(cdcMain, "kctx")
+                    .metric().registry("cdc").findMetric(CDC_MODE);
+
+                return m != null && expVal.name().equals(m.value());
+            }
+            catch (Exception e) {
+                return false;
+            }
+        }, 30_000, 10));
+    }
+
+    /**
+     * Checks that WAL CDC directory contains specified segments.
+     *
+     * @param from Start segment index to check.
+     * @param to Inclusive end segment index.
+     */
+    private void checkCdcSegmentsExists(long from, long to) throws IgniteInterruptedCheckedException {
+        List<Long> expected;
+
+        if (from > to)
+            expected = Collections.emptyList();
+        else
+            expected = LongStream.range(from, to + 1).boxed().collect(Collectors.toList());
+
+        assertTrue(waitForCondition(() -> {
+            try {
+                List<Long> actual = Files.list(GridTestUtils.getFieldValue(cdcMain, "cdcDir"))
+                    .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()))
+                    .map(FileWriteAheadLogManager::segmentIndex)
+                    .sorted()
+                    .collect(Collectors.toList());
+
+                return expected.equals(actual);
+            }
+            catch (Exception e) {
+                return false;
+            }
+        }, 10_000, 10));
+    }
+
+    /** */
+    private static class NoOpCdcManagerPluginProvider extends AbstractTestPluginProvider {
+        /** {@inheritDoc} */
+        @Override public String name() {
+            return "NoOpCdcManagerPluginProvider";
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
+            if (CdcManager.class.equals(cls))
+                return (T)new NoOpCdcManager();
+
+            return null;
+        }
+    }
+
+    /** */
+    protected static class NoOpCdcManager extends GridCacheSharedManagerAdapter implements CdcManager {
+        /** {@inheritDoc} */
+        @Override public boolean enabled() {
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void collect(ByteBuffer dataBuf) {
+            // No-op.
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcManagerTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcManagerTest.java
new file mode 100644
index 0000000..223fec4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcManagerTest.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cdc.CdcManager;
+import org.apache.ignite.internal.cdc.CdcUtilityActiveCdcManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cdc.CdcManagerTest.TestCdcManager.cdcMgr;
+
+/** */
+@RunWith(Parameterized.class)
+public class CdcManagerTest extends GridCommonAbstractTest {
+    /** */
+    private static final int WAL_SEG_SIZE = 64 * (int)U.MB;
+
+    /** */
+    private IgniteEx ign;
+
+    /** */
+    private TestCdcManager cdcMgr;
+
+    /** */
+    private FileWriteAheadLogManager walMgr;
+
+    /** */
+    private ListeningTestLogger lsnrLog;
+
+    /** */
+    private WALMode walMode;
+
+    /** */
+    private static volatile boolean failCollect;
+
+    /** */
+    @Parameterized.Parameter
+    public boolean persistentEnabled;
+
+    /** */
+    @Parameterized.Parameters(name = "persistentEnabled={0}")
+    public static Object[] params() {
+        return new Object[] {false, true};
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(persistentEnabled)
+                .setCdcEnabled(true))
+            .setWalMode(walMode)
+            .setWalSegmentSize(WAL_SEG_SIZE));
+
+        cfg.setPluginProviders(new CdcManagerPluginProvider());
+
+        cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+        cfg.setGridLogger(lsnrLog);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+
+        lsnrLog = new ListeningTestLogger(log);
+
+        ign = startGrid(0);
+
+        ign.cluster().state(ClusterState.ACTIVE);
+
+        cdcMgr = cdcMgr(ign);
+        walMgr = (FileWriteAheadLogManager)ign.context().cache().context().wal(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        failCollect = false;
+    }
+
+    /** */
+    @Test
+    public void testDisableByProperty() throws Exception {
+        AtomicBoolean stopped = GridTestUtils.getFieldValue(cdcMgr(ign), "stop");
+
+        assertFalse(stopped.get());
+
+        DistributedChangeableProperty<Serializable> cdcDisableProp = ign.context().distributedConfiguration()
+            .property(FileWriteAheadLogManager.CDC_DISABLED);
+
+        cdcDisableProp.localUpdate(true);
+
+        assertTrue(GridTestUtils.waitForCondition(stopped::get, 10_000, 10));
+    }
+
+    /** */
+    @Test
+    public void testSingleSegmentContent() throws Exception {
+        for (int i = 0; i < 10_000; i++)
+            ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+        assertEquals(0, walMgr.currentSegment());
+
+        File walDir = walDir(ign);
+
+        stopGrid(0);
+
+        Path seg = Arrays.stream(walDir.listFiles()).sorted().findFirst().get().toPath();
+
+        ByteBuffer walBuf = ByteBuffer.wrap(Files.readAllBytes(seg));
+        ByteBuffer cdcBuf = ByteBuffer.wrap(cdcMgr(ign).buf.array());
+        cdcBuf.limit(WAL_SEG_SIZE);
+
+        assertEquals(0, walBuf.compareTo(cdcBuf));
+    }
+
+    /** */
+    @Test
+    public void testMultipleSegmentContent() throws Exception {
+        checkCdcContentWithRollover(() -> {
+            for (int i = 0; i < Integer.MAX_VALUE; i++) {
+                ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+                if (walMgr.currentSegment() > 0)
+                    break;
+            }
+        });
+    }
+
+    /** */
+    @Test
+    public void testMultipleSegmentContentWithForceNextSegmentRollover() throws Exception {
+        checkCdcContentWithRollover(() -> rollSegment(RolloverType.NEXT_SEGMENT));
+    }
+
+    /** */
+    @Test
+    public void testMultipleSegmentContentWithForceCurrentSegmentRollover() throws Exception {
+        checkCdcContentWithRollover(() -> rollSegment(RolloverType.CURRENT_SEGMENT));
+    }
+
+    /** */
+    @Test
+    public void testRestartNode() throws Exception {
+        for (int i = 0; i < 100; i++)
+            ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+        stopGrid(0);
+
+        File seg = Arrays.stream(walDir(ign).listFiles()).sorted().findFirst().get();
+        int len0 = writtenLength(seg);
+
+        ByteBuffer buf0 = cdcMgr.buf;
+
+        ign = startGrid(0);
+        cdcMgr = cdcMgr(ign);
+
+        for (int i = 0; i < 100; i++)
+            ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+        stopGrid(0);
+
+        int len1 = writtenLength(seg);
+
+        // Check CDC buffer content on first start.
+        ByteBuffer walBuf = ByteBuffer.wrap(Files.readAllBytes(seg.toPath()));
+        walBuf.limit(len0);
+
+        ByteBuffer cdcBuf = ByteBuffer.wrap(buf0.array());
+        cdcBuf.limit(len0);
+
+        assertEquals(0, walBuf.compareTo(cdcBuf));
+
+        // Check CDC buffer content on second start.
+        walBuf.limit(len1);
+        walBuf.position(len0);
+
+        cdcBuf = ByteBuffer.wrap(cdcMgr.buf.array());
+        cdcBuf.limit(len1 - len0);
+
+        assertEquals(0, walBuf.compareTo(cdcBuf));
+    }
+
+    /** */
+    @Test
+    public void testProhibitedWalModes() throws Exception {
+        for (WALMode m: WALMode.values()) {
+            stopGrid(0);
+            cleanPersistenceDir();
+
+            walMode = m;
+
+            boolean cdcStart = m == WALMode.LOG_ONLY;
+
+            LogListener cdcWarnLsnr = LogListener
+                .matches("Custom CdcManager is only supported for WALMode.LOG_ONLY")
+                .times(cdcStart ? 0 : 1)
+                .build();
+
+            lsnrLog.registerListener(cdcWarnLsnr);
+
+            IgniteEx ign = startGrid(0);
+
+            assertTrue(m.toString(), cdcWarnLsnr.check());
+
+            if (cdcStart)
+                assertTrue(cdcMgr(ign) instanceof TestCdcManager);
+            else
+                assertTrue(ign.context().cache().context().cdc() instanceof CdcUtilityActiveCdcManager);
+        }
+    }
+
+    /** */
+    @Test
+    public void testCollectFailedIgnored() throws Exception {
+        LogListener cdcErrLsnr = LogListener
+            .matches("Error happened during CDC data collection.")
+            .atLeast(1)
+            .build();
+
+        lsnrLog.registerListener(cdcErrLsnr);
+
+        failCollect = true;
+
+        ign.cache(DEFAULT_CACHE_NAME).put(0, 0);
+
+        assertTrue(GridTestUtils.waitForCondition(cdcErrLsnr::check, 10_000, 10));
+
+        assertEquals(0, ign.cache(DEFAULT_CACHE_NAME).get(0));
+    }
+
+    /** */
+    public void checkCdcContentWithRollover(Runnable rollSegment) throws Exception {
+        for (int i = 0; i < 10_000; i++)
+            ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+        rollSegment.run();
+
+        for (int i = 0; i < 10_000; i++)
+            ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+        File walDir = walDir(ign);
+
+        stopGrid(0);
+
+        List<File> segs = Arrays.stream(walDir.listFiles()).sorted()
+            .limit(2)
+            .collect(Collectors.toList());
+
+        int firstSegActLen = writtenLength(segs.get(0));
+
+        ByteBuffer walBuf = ByteBuffer.wrap(Files.readAllBytes(segs.get(0).toPath()));
+        ByteBuffer cdcBuf = ByteBuffer.wrap(cdcMgr.buf.array());
+        walBuf.limit(firstSegActLen);
+        cdcBuf.limit(firstSegActLen);
+
+        assertEquals(0, walBuf.compareTo(cdcBuf));
+
+        walBuf = ByteBuffer.wrap(Files.readAllBytes(segs.get(1).toPath()));
+        cdcBuf = ByteBuffer.wrap(cdcMgr.buf.array());
+        cdcBuf.position(firstSegActLen + 1);
+        cdcBuf.limit(firstSegActLen + 1 + WAL_SEG_SIZE);
+
+        assertEquals(0, walBuf.compareTo(cdcBuf));
+    }
+
+    /** */
+    private void rollSegment(RolloverType rollType) {
+        if (persistentEnabled)
+            dbMgr(ign).checkpointReadLock();
+
+        try {
+            walMgr.log(new CheckpointRecord(null), rollType);
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+        finally {
+            if (persistentEnabled)
+                dbMgr(ign).checkpointReadUnlock();
+        }
+    }
+
+    /** Get WAL directory. */
+    private File walDir(IgniteEx ign) throws Exception {
+        return new File(
+            U.resolveWorkDirectory(
+                ign.configuration().getWorkDirectory(),
+                DataStorageConfiguration.DFLT_WAL_PATH,
+                false),
+            ign.context().pdsFolderResolver().resolveFolders().folderName());
+    }
+
+    /** @return Length of the all written records in the specified segment. */
+    private int writtenLength(File walSegment) throws Exception {
+        IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
+
+        IgniteWalIteratorFactory.IteratorParametersBuilder params = new IgniteWalIteratorFactory.IteratorParametersBuilder()
+            .filesOrDirs(walSegment);
+
+        try (WALIterator walIt = factory.iterator(params)) {
+            while (walIt.hasNext())
+                walIt.next();
+
+            return walIt.lastRead().get().next().fileOffset();
+        }
+    }
+
+    /** */
+    private static class CdcManagerPluginProvider extends AbstractTestPluginProvider {
+        /** {@inheritDoc} */
+        @Override public String name() {
+            return "CdcManagerPluginProvider";
+        }
+
+        /** {@inheritDoc} */
+        @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
+            if (CdcManager.class.equals(cls))
+                return (T)new TestCdcManager();
+
+            return null;
+        }
+    }
+
+    /** Test {@link CdcManager} for testing WAL fsync guarantees. */
+    protected static class TestCdcManager extends GridCacheSharedManagerAdapter implements CdcManager {
+        /** Buffer to store collected data. */
+        private final ByteBuffer buf;
+
+        /** */
+        TestCdcManager() {
+            buf = ByteBuffer.allocate(2 * WAL_SEG_SIZE);
+
+            Arrays.fill(buf.array(), (byte)0);
+
+            buf.position(0);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void collect(ByteBuffer dataBuf) {
+            if (failCollect)
+                throw new RuntimeException();
+
+            if (log.isDebugEnabled())
+                log.debug("Collect data buffer [offset=" + dataBuf.position() + ", limit=" + dataBuf.limit() + ']');
+
+            buf.put(dataBuf);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean enabled() {
+            return true;
+        }
+
+        /** @return CdcManager for specified Ignite node. */
+        static TestCdcManager cdcMgr(IgniteEx ign) {
+            return (TestCdcManager)ign.context().cache().context().cdc();
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
index 401b02f..41a957d 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
@@ -354,6 +354,10 @@
         while (iter.hasNext()) {
             IgniteBiTuple<WALPointer, WALRecord> rec = iter.next();
 
+            // Custom records could be written in WAL even if persistence isn't enabled.
+            if (rec.get2().type().purpose() == WALRecord.RecordPurpose.CUSTOM)
+                continue;
+
             if (persistenceEnabled && (!(rec.get2() instanceof DataRecord)))
                 continue;
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
index 60ffcee..c228cb3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
@@ -24,6 +24,8 @@
 import java.util.UUID;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord;
 import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord;
@@ -92,6 +94,7 @@
 import org.apache.ignite.internal.processors.cache.tree.DataInnerIO;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.transactions.TransactionState;
 
@@ -113,6 +116,8 @@
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REMOVE;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REPLACE;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_STOP_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CLUSTER_SNAPSHOT;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CONSISTENT_CUT;
@@ -267,6 +272,9 @@
         put(CLUSTER_SNAPSHOT, RecordUtils::buildClusterSnapshotRecord);
         put(INCREMENTAL_SNAPSHOT_START_RECORD, RecordUtils::buildIncrementalSnapshotStartRecord);
         put(INCREMENTAL_SNAPSHOT_FINISH_RECORD, RecordUtils::buildIncrementalSnapshotFinishRecord);
+        put(CDC_MANAGER_RECORD, RecordUtils::buildCdcManagerStopRecord);
+        put(CDC_MANAGER_STOP_RECORD, RecordUtils::buildCdcManagerStopRecord);
+
     }
 
     /** */
@@ -649,4 +657,14 @@
         return new IncrementalSnapshotFinishRecord(
             UUID.randomUUID(), F.asSet(new GridCacheVersion()), F.asSet(new GridCacheVersion()));
     }
+
+    /** **/
+    public static CdcManagerRecord buildCdcManagerRecord() {
+        return new CdcManagerRecord(new T2<>(new WALPointer(0, 0, 0), 0));
+    }
+
+    /** **/
+    public static CdcManagerStopRecord buildCdcManagerStopRecord() {
+        return new CdcManagerStopRecord();
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 2848898..7d2ea12 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -22,6 +22,8 @@
 import java.util.List;
 import org.apache.ignite.cdc.CdcCacheConfigOnRestartTest;
 import org.apache.ignite.cdc.CdcCacheVersionTest;
+import org.apache.ignite.cdc.CdcIgniteNodeActiveModeTest;
+import org.apache.ignite.cdc.CdcManagerTest;
 import org.apache.ignite.cdc.CdcNonDefaultWorkDirTest;
 import org.apache.ignite.cdc.CdcSelfTest;
 import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
@@ -162,6 +164,8 @@
         GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CdcCacheConfigOnRestartTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, CdcNonDefaultWorkDirTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, CdcManagerTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, CdcIgniteNodeActiveModeTest.class, ignoredTests);
 
         // new style folders with generated consistent ID test
         GridTestUtils.addTestIfNeeded(suite, IgniteUidAsConsistentIdMigrationTest.class, ignoredTests);