IGNITE-17700 Introduce CdcManager (#11044)
Co-authored-by: Nikolay Izhikov <nizhikov@apache.org>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
index 379c2bb..942fd24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcConsumerState.java
@@ -71,6 +71,15 @@
/** */
public static final String CACHES_STATE_FILE_NAME = "cdc-caches-state" + FILE_SUFFIX;
+ /**
+ * The file stores state of CDC mode. Content of the file is a {@link CdcMode} value:
+ * <ul>
+ * <li>{@link CdcMode#CDC_UTILITY_ACTIVE} means that {@link CdcMain} utility captures data.</li>
+ * <li>{@link CdcMode#IGNITE_NODE_ACTIVE} means that {@link CdcManager} captures data within Ignite node.</li>
+ * </ul>
+ */
+ public static final String CDC_MODE_FILE_NAME = "cdc-mode" + FILE_SUFFIX;
+
/** Log. */
private final IgniteLogger log;
@@ -98,6 +107,12 @@
/** Mappings types state file. */
private final Path tmpCaches;
+ /** CDC manager mode state file. */
+ private final Path cdcMode;
+
+ /** Temp CDC manager mode state file. */
+ private final Path tmpCdcMode;
+
/**
* @param stateDir State directory.
*/
@@ -111,6 +126,8 @@
tmpMappings = stateDir.resolve(MAPPINGS_STATE_FILE_NAME + TMP_SUFFIX);
caches = stateDir.resolve(CACHES_STATE_FILE_NAME);
tmpCaches = stateDir.resolve(CACHES_STATE_FILE_NAME + TMP_SUFFIX);
+ cdcMode = stateDir.resolve(CDC_MODE_FILE_NAME);
+ tmpCdcMode = stateDir.resolve(CDC_MODE_FILE_NAME + TMP_SUFFIX);
}
/**
@@ -278,4 +295,26 @@
throw new RuntimeException(e);
}
}
+
+ /**
+ * Loads CDC mode state from file.
+ *
+ * @return CDC mode state.
+ */
+ public CdcMode loadCdcMode() {
+ CdcMode state = load(cdcMode, () -> CdcMode.IGNITE_NODE_ACTIVE);
+
+ log.info("CDC mode loaded [" + state + ']');
+
+ return state;
+ }
+
+ /**
+ * Saves CDC mode state to file.
+ *
+ * @param mode CDC mode.
+ */
+ public void saveCdcMode(CdcMode mode) throws IOException {
+ save(mode, tmpCdcMode, cdcMode);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
index 84dc0a0..cbf44b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMain.java
@@ -50,6 +50,9 @@
import org.apache.ignite.internal.MarshallerContextImpl;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cdc.WalRecordsConsumer.DataEntryIterator;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridLocalConfigManager;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderResolver;
@@ -66,6 +69,8 @@
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.platform.PlatformType;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
@@ -79,6 +84,8 @@
import static org.apache.ignite.internal.IgnitionEx.initializeDefaultMBeanServer;
import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_STOP_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD_V2;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.segmentIndex;
@@ -159,6 +166,17 @@
/** Cdc directory metric name. */
public static final String CDC_DIR = "CdcDir";
+ /** Cdc mode metric name. */
+ public static final String CDC_MODE = "CdcMode";
+
+ /** Filter for consumption in {@link CdcMode#IGNITE_NODE_ACTIVE} mode. */
+ private static final IgniteBiPredicate<WALRecord.RecordType, WALPointer> PASSIVE_RECS =
+ (type, ptr) -> type == CDC_MANAGER_STOP_RECORD || type == CDC_MANAGER_RECORD;
+
+ /** Filter for consumption in {@link CdcMode#CDC_UTILITY_ACTIVE} mode. */
+ private static final IgniteBiPredicate<WALRecord.RecordType, WALPointer> ACTIVE_RECS =
+ (type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD;
+
/** Ignite configuration. */
private final IgniteConfiguration igniteCfg;
@@ -210,7 +228,13 @@
/** Change Data Capture state. */
private CdcConsumerState state;
- /** Save state to start from. */
+ /**
+ * Saved state to start from. Points to the last committed offset. Set to {@code null} after failover on start and
+ * switching from {@link CdcMode#IGNITE_NODE_ACTIVE} to {@link CdcMode#CDC_UTILITY_ACTIVE}.
+ *
+ * @see #removeProcessedOnFailover(Path)
+ * @see #consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersBuilder)
+ */
private T2<WALPointer, Integer> walState;
/** Types state. */
@@ -222,6 +246,9 @@
/** Caches state. */
private Map<Integer, Long> cachesState;
+ /** CDC mode state. */
+ private volatile CdcMode cdcModeState;
+
/** Stopped flag. */
private volatile boolean started;
@@ -312,6 +339,7 @@
typesState = state.loadTypesState();
mappingsState = state.loadMappingsState();
cachesState = state.loadCaches();
+ cdcModeState = state.loadCdcMode();
if (walState != null) {
committedSegmentIdx.value(walState.get1().index());
@@ -395,6 +423,7 @@
lastSegmentConsumptionTs =
mreg.longMetric(LAST_SEG_CONSUMPTION_TIME, "Last time of consumption of WAL segment");
metaUpdate = mreg.histogram(META_UPDATE, new long[] {100, 500, 1000}, "Metadata update time");
+ mreg.register(CDC_MODE, () -> cdcModeState.name(), String.class, "CDC mode");
}
/**
@@ -444,7 +473,7 @@
try (Stream<Path> cdcFiles = Files.list(cdcDir)) {
Set<Path> exists = new HashSet<>();
- cdcFiles
+ Iterator<Path> segments = cdcFiles
.peek(exists::add) // Store files that exists in cdc dir.
// Need unseen WAL segments only.
.filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()) && !seen.contains(p))
@@ -459,8 +488,26 @@
}
lastSgmnt.set(nextSgmnt);
- })
- .forEach(this::consumeSegment); // Consuming segments.
+ }).iterator();
+
+ while (segments.hasNext()) {
+ Path segment = segments.next();
+
+ if (walState != null && removeProcessedOnFailover(segment))
+ continue;
+
+ if (consumeSegment(segment)) {
+ // CDC mode switched. Reset partitions info to handle them again actively.
+ seen.clear();
+ lastSgmnt.set(-1);
+
+ walState = state.loadWalState();
+
+ break;
+ }
+
+ walState = null;
+ }
seen.removeIf(p -> !exists.contains(p)); // Clean up seen set.
@@ -477,8 +524,12 @@
}
}
- /** Reads all available records from segment. */
- private void consumeSegment(Path segment) {
+ /**
+ * Reads all available records from segment.
+ *
+ * @return {@code true} if mode switched.
+ */
+ private boolean consumeSegment(Path segment) {
updateMetadata();
if (log.isInfoEnabled())
@@ -491,94 +542,95 @@
.marshallerMappingFileStoreDir(marshaller)
.igniteConfigurationModifier((cfg) -> cfg.setPluginProviders(igniteCfg.getPluginProviders()))
.keepBinary(cdcCfg.isKeepBinary())
- .filesOrDirs(segment.toFile())
- .addFilter((type, ptr) -> type == DATA_RECORD_V2 || type == CDC_DATA_RECORD);
+ .filesOrDirs(segment.toFile());
if (igniteCfg.getDataStorageConfiguration().getPageSize() != 0)
builder.pageSize(igniteCfg.getDataStorageConfiguration().getPageSize());
+ if (walState != null)
+ builder.from(walState.get1());
+
long segmentIdx = segmentIndex(segment);
lastSegmentConsumptionTs.value(System.currentTimeMillis());
curSegmentIdx.value(segmentIdx);
- if (walState != null) {
- if (segmentIdx > walState.get1().index()) {
- throw new IgniteException("Found segment greater then saved state. Some events are missed. Exiting! " +
- "[state=" + walState + ", segment=" + segmentIdx + ']');
- }
-
- if (segmentIdx < walState.get1().index()) {
- if (log.isInfoEnabled()) {
- log.info("Already processed segment found. Skipping and deleting the file [segment=" +
- segmentIdx + ", state=" + walState.get1().index() + ']');
- }
-
- // WAL segment is a hard link to a segment file in the special Change Data Capture folder.
- // So, we can safely delete it after processing.
- try {
- Files.delete(segment);
-
- return;
- }
- catch (IOException e) {
- throw new IgniteException(e);
- }
- }
-
- builder.from(walState.get1());
+ if (cdcModeState == CdcMode.IGNITE_NODE_ACTIVE) {
+ if (consumeSegmentPassively(builder))
+ return true;
}
+ else
+ consumeSegmentActively(builder);
- try (DataEntryIterator iter = new DataEntryIterator(new IgniteWalIteratorFactory(log).iterator(builder))) {
- if (walState != null) {
+ processedSegments.add(segment);
+
+ return false;
+ }
+
+ /**
+ * Consumes CDC events in {@link CdcMode#CDC_UTILITY_ACTIVE} mode.
+ */
+ private void consumeSegmentActively(IgniteWalIteratorFactory.IteratorParametersBuilder builder) {
+ try (DataEntryIterator iter = new DataEntryIterator(new IgniteWalIteratorFactory(log).iterator(builder.addFilter(ACTIVE_RECS)))) {
+ if (walState != null)
iter.init(walState.get2());
- walState = null;
- }
-
- boolean interrupted = false;
+ boolean interrupted;
do {
boolean commit = consumer.onRecords(iter);
- if (commit) {
- T2<WALPointer, Integer> curState = iter.state();
-
- if (curState == null)
- continue;
-
- if (log.isDebugEnabled())
- log.debug("Saving state [curState=" + curState + ']');
-
- state.saveWal(curState);
-
- committedSegmentIdx.value(curState.get1().index());
- committedSegmentOffset.value(curState.get1().fileOffset());
-
- // Can delete after new file state save.
- if (!processedSegments.isEmpty()) {
- // WAL segment is a hard link to a segment file in a specifal Change Data Capture folder.
- // So we can safely delete it after success processing.
- for (Path processedSegment : processedSegments) {
- // Can't delete current segment, because state points to it.
- if (processedSegment.equals(segment))
- continue;
-
- Files.delete(processedSegment);
- }
-
- processedSegments.clear();
- }
- }
+ if (commit)
+ saveStateAndRemoveProcessed(iter.state());
interrupted = Thread.interrupted();
} while (iter.hasNext() && !interrupted);
if (interrupted)
throw new IgniteException("Change Data Capture Application interrupted");
+ }
+ catch (IgniteCheckedException | IOException e) {
+ throw new IgniteException(e);
+ }
+ }
- processedSegments.add(segment);
+ /**
+ * Consumes CDC events in {@link CdcMode#IGNITE_NODE_ACTIVE} mode.
+ *
+ * @return {@code true} if mode switched.
+ */
+ private boolean consumeSegmentPassively(IgniteWalIteratorFactory.IteratorParametersBuilder builder) {
+ try (WALIterator iter = new IgniteWalIteratorFactory(log).iterator(builder.addFilter(PASSIVE_RECS))) {
+ boolean interrupted = false;
+
+ while (iter.hasNext() && !interrupted) {
+ IgniteBiTuple<WALPointer, WALRecord> next = iter.next();
+
+ WALRecord walRecord = next.get2();
+
+ switch (walRecord.type()) {
+ case CDC_MANAGER_RECORD:
+ saveStateAndRemoveProcessed(((CdcManagerRecord)walRecord).walState());
+
+ break;
+
+ case CDC_MANAGER_STOP_RECORD:
+ state.saveCdcMode((cdcModeState = CdcMode.CDC_UTILITY_ACTIVE));
+
+ return true;
+
+ default:
+ throw new IgniteException("Unexpected record [type=" + walRecord.type() + ']');
+ }
+
+ interrupted = Thread.interrupted();
+ }
+
+ if (interrupted)
+ throw new IgniteException("Change Data Capture Application interrupted");
+
+ return false;
}
catch (IgniteCheckedException | IOException e) {
throw new IgniteException(e);
@@ -727,6 +779,72 @@
}
/**
+ * Remove segment file if it already processed. {@link #walState} points to the last committed offset so all files
+ * before this offset can be removed.
+ *
+ * @param segment Segment to check.
+ * @return {@code True} if segment file was deleted, {@code false} otherwise.
+ */
+ private boolean removeProcessedOnFailover(Path segment) {
+ long segmentIdx = segmentIndex(segment);
+
+ if (segmentIdx > walState.get1().index()) {
+ throw new IgniteException("Found segment greater then saved state. Some events are missed. Exiting! " +
+ "[state=" + walState + ", segment=" + segmentIdx + ']');
+ }
+
+ if (segmentIdx < walState.get1().index()) {
+ if (log.isInfoEnabled()) {
+ log.info("Already processed segment found. Skipping and deleting the file [segment=" +
+ segmentIdx + ", state=" + walState.get1().index() + ']');
+ }
+
+ // WAL segment is a hard link to a segment file in the special Change Data Capture folder.
+ // So, we can safely delete it after processing.
+ try {
+ Files.delete(segment);
+
+ return true;
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ return false;
+ }
+
+ /** Saves WAL consumption state and delete segments that no longer required. */
+ private void saveStateAndRemoveProcessed(T2<WALPointer, Integer> curState) throws IOException {
+ if (curState == null)
+ return;
+
+ if (log.isDebugEnabled())
+ log.debug("Saving state [curState=" + curState + ']');
+
+ state.saveWal(curState);
+
+ committedSegmentIdx.value(curState.get1().index());
+ committedSegmentOffset.value(curState.get1().fileOffset());
+
+ Iterator<Path> rmvIter = processedSegments.iterator();
+
+ while (rmvIter.hasNext()) {
+ Path processedSegment = rmvIter.next();
+
+ // Can't delete current segment, because state points to it.
+ if (segmentIndex(processedSegment) >= curState.get1().index())
+ continue;
+
+ // WAL segment is a hard link to a segment file in a specifal Change Data Capture folder.
+ // So we can safely delete it after success processing.
+ Files.delete(processedSegment);
+
+ rmvIter.remove();
+ }
+ }
+
+ /**
* Try locks Change Data Capture directory.
*
* @param dbStoreDirWithSubdirectory Root PDS directory.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcManager.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcManager.java
new file mode 100644
index 0000000..af9bed7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcManager.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
+import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.lang.IgniteExperimental;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+
+/**
+ * CDC manager responsible for logic of capturing data within Ignite node. Communication between {@link CdcManager} and
+ * {@link CdcMain} components is established through CDC management WAL records. {@link CdcManager} logs them, while {@link CdcMain}
+ * listens to them:
+ * <ul>
+ * <li>
+ * This manager can log {@link CdcManagerRecord} with committed consumer state. This record will be handled by
+ * {@link CdcMain} the same way it handles {@link CdcConsumer#onEvents(Iterator)} when it returns {@code true}.
+ * </li>
+ * <li>
+ * This manager must log {@link CdcManagerStopRecord} after it stopped handling records by any reason.
+ * {@link CdcMain} will start consume records instead since {@link CdcManagerRecord#walState()} of
+ * last consumed {@link CdcManagerRecord}.
+ * </li>
+ * <li>
+ * Apache Ignite provides a default implementation - {@link CdcUtilityActiveCdcManager}. It is disabled from the
+ * beginning, logs the {@link CdcManagerStopRecord} after Ignite node started, and then delegates consuming CDC
+ * events to the {@link CdcMain} utility.
+ * </li>
+ * </ul>
+ *
+ * Apache Ignite can be extended with custom CDC manager. It's required to implement this interface, create own {@link PluginProvider}
+ * that will return the implementation with {@link PluginProvider#createComponent(PluginContext, Class)} method.
+ * The {@code Class} parameter in the method is {@code CdcManager} class.
+ *
+ * @see CdcConsumer#onEvents(Iterator)
+ * @see CdcConsumerState#saveCdcMode(CdcMode)
+ */
+@IgniteExperimental
+public interface CdcManager extends GridCacheSharedManager, DatabaseLifecycleListener {
+ /**
+ * If this manager isn't enabled then Ignite skips notifying the manager with following methods.
+ *
+ * @return {@code true} if manager is enabled, otherwise {@code false}.
+ */
+ public boolean enabled();
+
+ /**
+ * Callback is executed only once on Ignite node start when binary memory has fully restored and WAL logging is resumed.
+ * It's executed before the first call of {@link #collect(ByteBuffer)}.
+ * <p> Implementation suggestions:
+ * <ul>
+ * <li>
+ * Implementation must subscribe to the database events to get notified with this callback via
+ * {@link GridInternalSubscriptionProcessor#registerDatabaseListener(DatabaseLifecycleListener)}
+ * </li>
+ * <li>Callback can be used for restoring CDC state on Ignite node start, collecting missed events from WAL segments.</li>
+ * <li>Be aware, this method runs in the Ignite system thread and might get longer the Ignite start procedure.</li>
+ * <li>Ignite node will fail in case the method throws an exception.</li>
+ * </ul>
+ */
+ @Override public default void afterBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr,
+ GridCacheDatabaseSharedManager.RestoreBinaryState restoreState) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /**
+ * Callback to collect written WAL records. The provided buffer is a continuous part of WAL segment file.
+ * The buffer might contain full content of a segment or only piece of it. There are guarantees:
+ * <ul>
+ * <li>This method is invoked sequentially.</li>
+ * <li>Provided {@code dataBuf} is a continuation of the previous one.</li>
+ * <li>{@code dataBuf} contains finite number of completed WAL records. No partially written WAL records are present.</li>
+ * <li>Records can be read from the buffer with {@link RecordSerializer#readRecord(FileInput, WALPointer)}.</li>
+ * <li>{@code dataBuf} must not be changed within this method.</li>
+ * </ul>
+ *
+ * <p> Implementation suggestions:
+ * <ul>
+ * <li>
+ * Frequence of calling the method depends on frequence of fsyncing WAL segment.
+ * See {@link IgniteSystemProperties#IGNITE_WAL_SEGMENT_SYNC_TIMEOUT}.
+ * </li>
+ * <li>
+ * It must handle the content of the {@code dataBuf} within the calling thread.
+ * Content of the buffer will not be changed before this method returns.
+ * </li>
+ * <li>It must not block the calling thread and work quickly.</li>
+ * <li>Ignite will ignore any {@link Throwable} throwed from this method.</li>
+ * </ul>
+ *
+ * @param dataBuf Buffer that contains data to collect.
+ */
+ public void collect(ByteBuffer dataBuf);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMode.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMode.java
new file mode 100644
index 0000000..ff4e186
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcMode.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+/** CDC modes. */
+public enum CdcMode {
+ /** CDC events are handled by {@link CdcManager} within Ignite node. */
+ IGNITE_NODE_ACTIVE,
+
+ /** CDC events are handled by {@link CdcMain} utility. */
+ CDC_UTILITY_ACTIVE
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtilityActiveCdcManager.java b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtilityActiveCdcManager.java
new file mode 100644
index 0000000..02a4d55
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cdc/CdcUtilityActiveCdcManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cdc;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.cdc.CdcConsumerState.CDC_MODE_FILE_NAME;
+import static org.apache.ignite.internal.cdc.CdcMain.STATE_DIR;
+
+/**
+ * CDC manager that delegates consuming CDC events to the {@link CdcMain} utility.
+ */
+public class CdcUtilityActiveCdcManager extends GridCacheSharedManagerAdapter implements CdcManager, PartitionsExchangeAware {
+ /** */
+ @Override protected void start0() {
+ cctx.exchange().registerExchangeAwareComponent(this);
+ }
+
+ /** */
+ @Override public void onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ if (fut.localJoinExchange() || fut.activateCluster()) {
+ try {
+ File cdcModeFile = Paths.get(
+ ((FileWriteAheadLogManager)cctx.wal(true)).walCdcDirectory().getAbsolutePath(),
+ STATE_DIR,
+ CDC_MODE_FILE_NAME).toFile();
+
+ if (!cdcModeFile.exists())
+ cctx.wal(true).log(new CdcManagerStopRecord());
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to activate CdcManager. CDC might not work.");
+ }
+ finally {
+ cctx.exchange().unregisterExchangeAwareComponent(this);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean enabled() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collect(ByteBuffer dataBuf) {
+ // No-op.
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerRecord.java
new file mode 100644
index 0000000..b4e717c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerRecord.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import java.util.Iterator;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ * This record notifies {@link CdcMain} about committed WAL state. It's assumed that WAL state will be inferred from
+ * {@link CdcConsumer} committed WALPointer.
+ *
+ * @see CdcConsumer#onEvents(Iterator)
+ */
+public class CdcManagerRecord extends WALRecord {
+ /** CDC consumer WAL state. */
+ private final T2<WALPointer, Integer> state;
+
+ /** */
+ public CdcManagerRecord(T2<WALPointer, Integer> state) {
+ this.state = state;
+ }
+
+ /** @return CDC consumer WAL state. */
+ public T2<WALPointer, Integer> walState() {
+ return state;
+ }
+
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.CDC_MANAGER_RECORD;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerStopRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerStopRecord.java
new file mode 100644
index 0000000..729cb93
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/CdcManagerStopRecord.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagemem.wal.record;
+
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.cdc.CdcManager;
+
+/**
+ * Record notifies {@link CdcMain} that {@link CdcManager} fails and stop processing WAL data in-memory.
+ * {@link CdcMain} must start capturing and process {@link CdcEvent} in regularly(active) way.
+ */
+public class CdcManagerStopRecord extends WALRecord {
+ /** {@inheritDoc} */
+ @Override public RecordType type() {
+ return RecordType.CDC_MANAGER_STOP_RECORD;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
index 3457251..ebaa726 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java
@@ -291,7 +291,13 @@
INCREMENTAL_SNAPSHOT_FINISH_RECORD(77, LOGICAL),
/** CDC data record. */
- CDC_DATA_RECORD(78, CUSTOM);
+ CDC_DATA_RECORD(78, CUSTOM),
+
+ /** CDC manager record. */
+ CDC_MANAGER_RECORD(79, CUSTOM),
+
+ /** CDC manager record. */
+ CDC_MANAGER_STOP_RECORD(80, CUSTOM);
/** Index for serialization. Should be consistent throughout all versions. */
private final int idx;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 41871d0..ce42781 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -67,6 +67,7 @@
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.configuration.WarmUpConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
@@ -78,6 +79,8 @@
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.cdc.CdcManager;
+import org.apache.ignite.internal.cdc.CdcUtilityActiveCdcManager;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.DetachedClusterNode;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
@@ -204,6 +207,7 @@
import static java.util.stream.Collectors.toSet;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -3034,6 +3038,7 @@
IgniteCacheDatabaseSharedManager dbMgr;
IgnitePageStoreManager pageStoreMgr = null;
IgniteWriteAheadLogManager walMgr = null;
+ CdcManager cdcMgr = null;
if (CU.isPersistenceEnabled(ctx.config()) && !ctx.clientNode()) {
dbMgr = new GridCacheDatabaseSharedManager(ctx);
@@ -3059,6 +3064,27 @@
walMgr = new FileWriteAheadLogManager(ctx);
}
+ if (CU.isCdcEnabled(ctx.config()) && !ctx.clientNode()) {
+ cdcMgr = ctx.plugins().createComponent(CdcManager.class);
+
+ if (cdcMgr != null) {
+ if (ctx.config().getDataStorageConfiguration().getWalMode() != WALMode.LOG_ONLY) {
+ U.warn(log, "Custom CdcManager is only supported for WALMode.LOG_ONLY. CdcManager configuration will be ignored.");
+
+ cdcMgr = null;
+ }
+
+ if (!IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, true)) {
+ U.warn(log, "Custom CdcManager is only supported for IGNITE_WAL_MMAP=true. CdcManager configuration will be ignored.");
+
+ cdcMgr = null;
+ }
+ }
+
+ if (cdcMgr == null)
+ cdcMgr = new CdcUtilityActiveCdcManager();
+ }
+
IgniteSnapshotManager snapshotMgr = ctx.plugins().createComponent(IgniteSnapshotManager.class);
if (snapshotMgr == null)
@@ -3082,6 +3108,7 @@
.setJtaManager(JTA.createOptional())
.setMvccCachingManager(new MvccCachingManager())
.setDiagnosticManager(new CacheDiagnosticManager())
+ .setCdcManager(cdcMgr)
.build(kernalCtx, storeSesLsnrs);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index c1b9c95..7d87015 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -36,6 +36,7 @@
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcManager;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
@@ -115,6 +116,9 @@
/** Write ahead log manager for CDC. {@code Null} if persistence AND CDC is not enabled. */
@Nullable private IgniteWriteAheadLogManager cdcWalMgr;
+ /** CDC manager. {@code null} if CDC isn't configured. */
+ @Nullable private CdcManager cdcMgr;
+
/** Write ahead log state manager. */
private WalStateManager walStateMgr;
@@ -230,7 +234,8 @@
CacheJtaManagerAdapter jtaMgr,
Collection<CacheStoreSessionListener> storeSesLsnrs,
MvccCachingManager mvccCachingMgr,
- CacheDiagnosticManager diagnosticMgr
+ CacheDiagnosticManager diagnosticMgr,
+ CdcManager cdcMgr
) {
this.kernalCtx = kernalCtx;
@@ -253,7 +258,8 @@
ttlMgr,
evictMgr,
mvccCachingMgr,
- diagnosticMgr
+ diagnosticMgr,
+ cdcMgr
);
this.storeSesLsnrs = storeSesLsnrs;
@@ -431,7 +437,8 @@
ttlMgr,
evictMgr,
mvccCachingMgr,
- diagnosticMgr
+ diagnosticMgr,
+ cdcMgr
);
this.mgrs = mgrs;
@@ -480,7 +487,8 @@
GridCacheSharedTtlCleanupManager ttlMgr,
PartitionsEvictManager evictMgr,
MvccCachingManager mvccCachingMgr,
- CacheDiagnosticManager diagnosticMgr
+ CacheDiagnosticManager diagnosticMgr,
+ CdcManager cdcMgr
) {
this.diagnosticMgr = add(mgrs, diagnosticMgr);
this.mvccMgr = add(mgrs, mvccMgr);
@@ -492,6 +500,7 @@
assert walMgr == null || walMgr == cdcWalMgr;
this.cdcWalMgr = walMgr == null ? add(mgrs, cdcWalMgr) : cdcWalMgr;
+ this.cdcMgr = add(mgrs, cdcMgr);
this.walStateMgr = add(mgrs, walStateMgr);
this.dbMgr = add(mgrs, dbMgr);
this.snapshotMgr = add(mgrs, snapshotMgr);
@@ -761,6 +770,13 @@
}
/**
+ * @return CDC manager.
+ */
+ public CdcManager cdc() {
+ return cdcMgr;
+ }
+
+ /**
* @return WAL state manager.
*/
public WalStateManager walState() {
@@ -1267,6 +1283,9 @@
private CacheDiagnosticManager diagnosticMgr;
/** */
+ private CdcManager cdcMgr;
+
+ /** */
private Builder() {
// No-op.
}
@@ -1295,7 +1314,8 @@
jtaMgr,
storeSesLsnrs,
mvccCachingMgr,
- diagnosticMgr
+ diagnosticMgr,
+ cdcMgr
);
}
@@ -1417,5 +1437,12 @@
return this;
}
+
+ /** */
+ public Builder setCdcManager(CdcManager cdcMgr) {
+ this.cdcMgr = cdcMgr;
+
+ return this;
+ }
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 445ccde..0580389 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -1069,6 +1069,10 @@
cctx.wal(true).startAutoReleaseSegments();
cctx.wal(true).resumeLogging(ptr);
+
+ // This callback is required for CdcManager initialization.
+ if (cctx.cdc() != null && cctx.cdc().enabled())
+ cctx.cdc().afterBinaryMemoryRestore(this, null);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index cc78a3c..12c40e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -518,8 +518,12 @@
name, oldVal, newVal));
}
- if (newVal != null && newVal)
+ if (newVal != null && newVal) {
log.warning("CDC was disabled.");
+
+ if (cctx.cdc() != null)
+ cctx.cdc().stop(true);
+ }
});
dispatcher.registerProperty(cdcDisabled);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
index 5dc2ec1..f242ebf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/filehandle/FileWriteHandleImpl.java
@@ -37,6 +37,7 @@
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.cdc.CdcManager;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
@@ -406,14 +407,7 @@
if (segs != null) {
assert segs.size() == 1;
- SegmentedRingByteBuffer.ReadSegment seg = segs.get(0);
-
- int off = seg.buffer().position();
- int len = seg.buffer().limit() - off;
-
- fsync((MappedByteBuffer)buf.buf, off, len);
-
- seg.release();
+ fsyncReadSegment(segs.get(0), false);
}
}
else
@@ -499,7 +493,7 @@
if (segs != null) {
assert segs.size() == 1;
- segs.get(0).release();
+ fsyncReadSegment(segs.get(0), true);
}
}
@@ -538,6 +532,36 @@
}
/**
+ * Make fsync for part of the WAL segment file. And collect it to {@link CdcManager} if enabled.
+ *
+ * @param seg Part of the WAL segment file.
+ * @param onlyCdc If {@code true} then skip actual fsync. TODO: IGNITE-20732
+ */
+ private void fsyncReadSegment(SegmentedRingByteBuffer.ReadSegment seg, boolean onlyCdc) throws IgniteCheckedException {
+ int off = seg.buffer().position();
+ int len = seg.buffer().limit() - off;
+
+ if (!onlyCdc)
+ fsync((MappedByteBuffer)buf.buf, off, len);
+
+ if (cctx.cdc() != null && cctx.cdc().enabled()) {
+ try {
+ ByteBuffer cdcBuf = buf.buf.asReadOnlyBuffer();
+ cdcBuf.position(off);
+ cdcBuf.limit(off + len);
+ cdcBuf.order(buf.buf.order());
+
+ cctx.cdc().collect(cdcBuf);
+ }
+ catch (Throwable cdcErr) {
+ U.error(log, "Error happened during CDC data collection.", cdcErr);
+ }
+ }
+
+ seg.release();
+ }
+
+ /**
* Signals next segment available to wake up other worker threads waiting for WAL to write.
*/
@Override public void signalNextAvailable() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
index 3907bf2..6363f59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordDataV1Serializer.java
@@ -37,6 +37,8 @@
import org.apache.ignite.internal.managers.encryption.GroupKeyEncrypted;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
@@ -541,6 +543,7 @@
return 4 + 8 + 1;
case SWITCH_SEGMENT_RECORD:
+ case CDC_MANAGER_STOP_RECORD:
return 0;
case TX_RECORD:
@@ -567,6 +570,9 @@
case INCREMENTAL_SNAPSHOT_FINISH_RECORD:
return ((IncrementalSnapshotFinishRecord)record).dataSize();
+ case CDC_MANAGER_RECORD:
+ return 8 + 4 + 4 + 4;
+
default:
throw new UnsupportedOperationException("Type: " + record.type());
}
@@ -1323,6 +1329,22 @@
break;
+ case CDC_MANAGER_RECORD:
+ long walSegIdx = in.readLong();
+ int fileOff = in.readInt();
+ int size = in.readInt();
+
+ int entryIdx = in.readInt();
+
+ res = new CdcManagerRecord(new T2<>(new WALPointer(walSegIdx, fileOff, size), entryIdx));
+
+ break;
+
+ case CDC_MANAGER_STOP_RECORD:
+ res = new CdcManagerStopRecord();
+
+ break;
+
default:
throw new UnsupportedOperationException("Type: " + type);
}
@@ -1912,6 +1934,7 @@
break;
case SWITCH_SEGMENT_RECORD:
+ case CDC_MANAGER_STOP_RECORD:
break;
case MASTER_KEY_CHANGE_RECORD_V2:
@@ -2002,6 +2025,17 @@
break;
+ case CDC_MANAGER_RECORD:
+ T2<WALPointer, Integer> state = ((CdcManagerRecord)rec).walState();
+
+ buf.putLong(state.get1().index());
+ buf.putInt(state.get1().fileOffset());
+ buf.putInt(state.get1().length());
+
+ buf.putInt(state.get2());
+
+ break;
+
default:
throw new UnsupportedOperationException("Type: " + rec.type());
}
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcIgniteNodeActiveModeTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcIgniteNodeActiveModeTest.java
new file mode 100644
index 0000000..6fe9dae
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcIgniteNodeActiveModeTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cdc.CdcConsumerState;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.cdc.CdcManager;
+import org.apache.ignite.internal.cdc.CdcMode;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord;
+import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneGridKernalContext;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.lang.RunnableX;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.spi.metric.ObjectMetric;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.wal.record.RecordUtils;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.cdc.CdcMain.CDC_MODE;
+import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_IDX;
+import static org.apache.ignite.internal.cdc.CdcMain.CUR_SEG_IDX;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
+import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+public class CdcIgniteNodeActiveModeTest extends AbstractCdcTest {
+ /** */
+ private IgniteEx ign;
+
+ /** */
+ private CdcMain cdcMain;
+
+ /** */
+ private IgniteInternalFuture<?> cdcMainFut;
+
+ /** */
+ private UserCdcConsumer cnsmr;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setCdcEnabled(true)));
+
+ cfg.setPluginProviders(new NoOpCdcManagerPluginProvider());
+
+ cfg.setCacheConfiguration(new CacheConfiguration<Integer, User>(DEFAULT_CACHE_NAME));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+
+ ign = startGrid();
+
+ ign.cluster().state(ACTIVE);
+
+ cdcMain = createCdc((cnsmr = new UserCdcConsumer()), ign.configuration());
+
+ cdcMainFut = runAsync(cdcMain);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ cdcMainFut.cancel();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** */
+ @Test
+ public void testCdcMetrics() throws Exception {
+ awaitCdcModeValue(CdcMode.IGNITE_NODE_ACTIVE);
+
+ IgniteCache<Integer, User> cache = ign.cache(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < 5_000; i++) {
+ cache.put(i, createUser(i));
+
+ if (i % 1_000 == 0) {
+ writeCdcManagerRecord();
+ rollSegment();
+ }
+ }
+
+ MetricRegistry mreg = GridTestUtils.<StandaloneGridKernalContext>getFieldValue(cdcMain, "kctx")
+ .metric().registry("cdc");
+
+ assertTrue(GridTestUtils.waitForCondition(() ->
+ mreg.<LongMetric>findMetric(CUR_SEG_IDX).value() > 1, 10_000, 100));
+
+ assertTrue(GridTestUtils.waitForCondition(() ->
+ mreg.<LongMetric>findMetric(COMMITTED_SEG_IDX).value() > 1, 10_000, 100));
+
+ awaitCdcModeValue(CdcMode.IGNITE_NODE_ACTIVE);
+ }
+
+ /** */
+ @Test
+ public void testSwitchToCdcUtilityActiveMode() throws Exception {
+ writeCdcManagerStopCdcRecord();
+
+ rollSegment();
+
+ awaitCdcModeValue(CdcMode.CDC_UTILITY_ACTIVE);
+ }
+
+ /** */
+ @Test
+ public void testSkipsNodeCommittedData() throws Exception {
+ List<Integer> expUsers = new ArrayList<>();
+
+ addData(0, 1, null);
+
+ writeCdcManagerRecord();
+
+ rollSegment();
+
+ addData(2, 3, expUsers);
+
+ rollSegment();
+
+ addData(4, 5, expUsers);
+
+ writeCdcManagerStopCdcRecord();
+
+ rollSegment();
+
+ awaitCdcModeValue(CdcMode.CDC_UTILITY_ACTIVE);
+
+ checkConsumerData(expUsers);
+
+ addData(6, 7, expUsers);
+
+ rollSegment();
+
+ checkConsumerData(expUsers);
+ }
+
+ /** */
+ @Test
+ public void testCleanWalsAfterCdcManagerRecord() throws Exception {
+ checkCdcSegmentsExists(0, -1); // No segments stored in CDC dir.
+
+ rollSegment();
+
+ checkCdcSegmentsExists(0, 0); // Segment 0 is stored.
+
+ rollSegment();
+
+ checkCdcSegmentsExists(0, 1); // Segment 1 is stored.
+
+ writeCdcManagerRecord();
+ rollSegment();
+
+ checkCdcSegmentsExists(2, 2); // Segments 0, 1 should be cleared after CdcManagerRecord.
+
+ rollSegment();
+
+ checkCdcSegmentsExists(2, 3); // Segment 3 is stored.
+
+ writeCdcManagerRecord();
+ rollSegment();
+
+ checkCdcSegmentsExists(4, 4); // Segments 2, 3 should be cleared after CdcManagerRecord.
+
+ writeCdcManagerStopCdcRecord();
+ rollSegment();
+
+ checkCdcSegmentsExists(5, 5); // Segment 4 is cleared after CdcMain consumes it.
+ }
+
+ /** */
+ @Test
+ public void testRestoreModeOnRestart() throws Exception {
+ RunnableX restartUtil = () -> {
+ cdcMainFut.cancel();
+ cdcMain = createCdc(cnsmr, getConfiguration(getTestIgniteInstanceName()));
+ cdcMainFut = runAsync(cdcMain);
+ };
+
+ List<Integer> expUsers = new ArrayList<>();
+
+ addData(0, 1, null);
+
+ rollSegment();
+
+ addData(2, 3, null);
+
+ restartUtil.run();
+
+ rollSegment();
+
+ writeCdcManagerRecord();
+ writeCdcManagerStopCdcRecord();
+
+ addData(4, 5, expUsers);
+
+ rollSegment();
+
+ restartUtil.run();
+
+ awaitCdcModeValue(CdcMode.CDC_UTILITY_ACTIVE);
+
+ checkConsumerData(expUsers);
+
+ addData(4, 5, expUsers);
+
+ rollSegment();
+
+ checkConsumerData(expUsers);
+ }
+
+ /** */
+ @Test
+ public void testStoreCdcMode() throws Exception {
+ writeCdcManagerStopCdcRecord();
+
+ rollSegment();
+
+ awaitCdcModeValue(CdcMode.CDC_UTILITY_ACTIVE);
+
+ CdcConsumerState state = GridTestUtils.getFieldValue(cdcMain, "state");
+
+ assertEquals(CdcMode.CDC_UTILITY_ACTIVE, state.loadCdcMode());
+ }
+
+ /** */
+ private WALPointer rollSegment() throws IgniteCheckedException {
+ return walMgr().log(RecordUtils.buildCheckpointRecord(), RolloverType.CURRENT_SEGMENT);
+ }
+
+ /** */
+ private void writeCdcManagerStopCdcRecord() throws Exception {
+ walMgr().log(new CdcManagerStopRecord());
+ }
+
+ /** */
+ private void writeCdcManagerRecord() throws Exception {
+ walMgr().log(new CdcManagerRecord(new T2<>(walMgr().log(RecordUtils.buildCheckpointRecord()), 0)));
+ }
+
+ /** */
+ private IgniteWriteAheadLogManager walMgr() {
+ return ign.context().cache().context().wal(true);
+ }
+
+ /**
+ * @param expUsers Expected users read by CDC.
+ */
+ private void checkConsumerData(List<Integer> expUsers) throws IgniteInterruptedCheckedException {
+ assertTrue(waitForCondition(() ->
+ expUsers.equals(cnsmr.data(ChangeEventType.UPDATE, cacheId(DEFAULT_CACHE_NAME))
+ ), 10_000, 10));
+ }
+
+ /**
+ * @param from First index of user to add.
+ * @param to Last index of user to add.
+ * @param expCdcReadUsers Expected CDC read users.
+ */
+ private void addData(int from, int to, List<Integer> expCdcReadUsers) {
+ IntStream.range(from, to).forEach(u -> ign.cache(DEFAULT_CACHE_NAME).put(u, createUser(u)));
+
+ if (expCdcReadUsers != null)
+ IntStream.range(from, to).forEach(expCdcReadUsers::add);
+ }
+
+ /** */
+ private void awaitCdcModeValue(CdcMode expVal) throws IgniteInterruptedCheckedException {
+ assertTrue(waitForCondition(() -> {
+ try {
+ ObjectMetric<String> m = GridTestUtils.<StandaloneGridKernalContext>getFieldValue(cdcMain, "kctx")
+ .metric().registry("cdc").findMetric(CDC_MODE);
+
+ return m != null && expVal.name().equals(m.value());
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }, 30_000, 10));
+ }
+
+ /**
+ * Checks that WAL CDC directory contains specified segments.
+ *
+ * @param from Start segment index to check.
+ * @param to Inclusive end segment index.
+ */
+ private void checkCdcSegmentsExists(long from, long to) throws IgniteInterruptedCheckedException {
+ List<Long> expected;
+
+ if (from > to)
+ expected = Collections.emptyList();
+ else
+ expected = LongStream.range(from, to + 1).boxed().collect(Collectors.toList());
+
+ assertTrue(waitForCondition(() -> {
+ try {
+ List<Long> actual = Files.list(GridTestUtils.getFieldValue(cdcMain, "cdcDir"))
+ .filter(p -> WAL_SEGMENT_FILE_FILTER.accept(p.toFile()))
+ .map(FileWriteAheadLogManager::segmentIndex)
+ .sorted()
+ .collect(Collectors.toList());
+
+ return expected.equals(actual);
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }, 10_000, 10));
+ }
+
+ /** */
+ private static class NoOpCdcManagerPluginProvider extends AbstractTestPluginProvider {
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "NoOpCdcManagerPluginProvider";
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
+ if (CdcManager.class.equals(cls))
+ return (T)new NoOpCdcManager();
+
+ return null;
+ }
+ }
+
+ /** */
+ protected static class NoOpCdcManager extends GridCacheSharedManagerAdapter implements CdcManager {
+ /** {@inheritDoc} */
+ @Override public boolean enabled() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collect(ByteBuffer dataBuf) {
+ // No-op.
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcManagerTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcManagerTest.java
new file mode 100644
index 0000000..223fec4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcManagerTest.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cdc.CdcManager;
+import org.apache.ignite.internal.cdc.CdcUtilityActiveCdcManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
+import org.apache.ignite.internal.pagemem.wal.record.RolloverType;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.plugin.AbstractTestPluginProvider;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cdc.CdcManagerTest.TestCdcManager.cdcMgr;
+
+/** */
+@RunWith(Parameterized.class)
+public class CdcManagerTest extends GridCommonAbstractTest {
+ /** */
+ private static final int WAL_SEG_SIZE = 64 * (int)U.MB;
+
+ /** */
+ private IgniteEx ign;
+
+ /** */
+ private TestCdcManager cdcMgr;
+
+ /** */
+ private FileWriteAheadLogManager walMgr;
+
+ /** */
+ private ListeningTestLogger lsnrLog;
+
+ /** */
+ private WALMode walMode;
+
+ /** */
+ private static volatile boolean failCollect;
+
+ /** */
+ @Parameterized.Parameter
+ public boolean persistentEnabled;
+
+ /** */
+ @Parameterized.Parameters(name = "persistentEnabled={0}")
+ public static Object[] params() {
+ return new Object[] {false, true};
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String instanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(instanceName);
+
+ cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+ .setPersistenceEnabled(persistentEnabled)
+ .setCdcEnabled(true))
+ .setWalMode(walMode)
+ .setWalSegmentSize(WAL_SEG_SIZE));
+
+ cfg.setPluginProviders(new CdcManagerPluginProvider());
+
+ cfg.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME));
+
+ cfg.setGridLogger(lsnrLog);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ cleanPersistenceDir();
+
+ lsnrLog = new ListeningTestLogger(log);
+
+ ign = startGrid(0);
+
+ ign.cluster().state(ClusterState.ACTIVE);
+
+ cdcMgr = cdcMgr(ign);
+ walMgr = (FileWriteAheadLogManager)ign.context().cache().context().wal(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ failCollect = false;
+ }
+
+ /** */
+ @Test
+ public void testDisableByProperty() throws Exception {
+ AtomicBoolean stopped = GridTestUtils.getFieldValue(cdcMgr(ign), "stop");
+
+ assertFalse(stopped.get());
+
+ DistributedChangeableProperty<Serializable> cdcDisableProp = ign.context().distributedConfiguration()
+ .property(FileWriteAheadLogManager.CDC_DISABLED);
+
+ cdcDisableProp.localUpdate(true);
+
+ assertTrue(GridTestUtils.waitForCondition(stopped::get, 10_000, 10));
+ }
+
+ /** */
+ @Test
+ public void testSingleSegmentContent() throws Exception {
+ for (int i = 0; i < 10_000; i++)
+ ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+ assertEquals(0, walMgr.currentSegment());
+
+ File walDir = walDir(ign);
+
+ stopGrid(0);
+
+ Path seg = Arrays.stream(walDir.listFiles()).sorted().findFirst().get().toPath();
+
+ ByteBuffer walBuf = ByteBuffer.wrap(Files.readAllBytes(seg));
+ ByteBuffer cdcBuf = ByteBuffer.wrap(cdcMgr(ign).buf.array());
+ cdcBuf.limit(WAL_SEG_SIZE);
+
+ assertEquals(0, walBuf.compareTo(cdcBuf));
+ }
+
+ /** */
+ @Test
+ public void testMultipleSegmentContent() throws Exception {
+ checkCdcContentWithRollover(() -> {
+ for (int i = 0; i < Integer.MAX_VALUE; i++) {
+ ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+ if (walMgr.currentSegment() > 0)
+ break;
+ }
+ });
+ }
+
+ /** */
+ @Test
+ public void testMultipleSegmentContentWithForceNextSegmentRollover() throws Exception {
+ checkCdcContentWithRollover(() -> rollSegment(RolloverType.NEXT_SEGMENT));
+ }
+
+ /** */
+ @Test
+ public void testMultipleSegmentContentWithForceCurrentSegmentRollover() throws Exception {
+ checkCdcContentWithRollover(() -> rollSegment(RolloverType.CURRENT_SEGMENT));
+ }
+
+ /** */
+ @Test
+ public void testRestartNode() throws Exception {
+ for (int i = 0; i < 100; i++)
+ ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+ stopGrid(0);
+
+ File seg = Arrays.stream(walDir(ign).listFiles()).sorted().findFirst().get();
+ int len0 = writtenLength(seg);
+
+ ByteBuffer buf0 = cdcMgr.buf;
+
+ ign = startGrid(0);
+ cdcMgr = cdcMgr(ign);
+
+ for (int i = 0; i < 100; i++)
+ ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+ stopGrid(0);
+
+ int len1 = writtenLength(seg);
+
+ // Check CDC buffer content on first start.
+ ByteBuffer walBuf = ByteBuffer.wrap(Files.readAllBytes(seg.toPath()));
+ walBuf.limit(len0);
+
+ ByteBuffer cdcBuf = ByteBuffer.wrap(buf0.array());
+ cdcBuf.limit(len0);
+
+ assertEquals(0, walBuf.compareTo(cdcBuf));
+
+ // Check CDC buffer content on second start.
+ walBuf.limit(len1);
+ walBuf.position(len0);
+
+ cdcBuf = ByteBuffer.wrap(cdcMgr.buf.array());
+ cdcBuf.limit(len1 - len0);
+
+ assertEquals(0, walBuf.compareTo(cdcBuf));
+ }
+
+ /** */
+ @Test
+ public void testProhibitedWalModes() throws Exception {
+ for (WALMode m: WALMode.values()) {
+ stopGrid(0);
+ cleanPersistenceDir();
+
+ walMode = m;
+
+ boolean cdcStart = m == WALMode.LOG_ONLY;
+
+ LogListener cdcWarnLsnr = LogListener
+ .matches("Custom CdcManager is only supported for WALMode.LOG_ONLY")
+ .times(cdcStart ? 0 : 1)
+ .build();
+
+ lsnrLog.registerListener(cdcWarnLsnr);
+
+ IgniteEx ign = startGrid(0);
+
+ assertTrue(m.toString(), cdcWarnLsnr.check());
+
+ if (cdcStart)
+ assertTrue(cdcMgr(ign) instanceof TestCdcManager);
+ else
+ assertTrue(ign.context().cache().context().cdc() instanceof CdcUtilityActiveCdcManager);
+ }
+ }
+
+ /** */
+ @Test
+ public void testCollectFailedIgnored() throws Exception {
+ LogListener cdcErrLsnr = LogListener
+ .matches("Error happened during CDC data collection.")
+ .atLeast(1)
+ .build();
+
+ lsnrLog.registerListener(cdcErrLsnr);
+
+ failCollect = true;
+
+ ign.cache(DEFAULT_CACHE_NAME).put(0, 0);
+
+ assertTrue(GridTestUtils.waitForCondition(cdcErrLsnr::check, 10_000, 10));
+
+ assertEquals(0, ign.cache(DEFAULT_CACHE_NAME).get(0));
+ }
+
+ /** */
+ public void checkCdcContentWithRollover(Runnable rollSegment) throws Exception {
+ for (int i = 0; i < 10_000; i++)
+ ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+ rollSegment.run();
+
+ for (int i = 0; i < 10_000; i++)
+ ign.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+ File walDir = walDir(ign);
+
+ stopGrid(0);
+
+ List<File> segs = Arrays.stream(walDir.listFiles()).sorted()
+ .limit(2)
+ .collect(Collectors.toList());
+
+ int firstSegActLen = writtenLength(segs.get(0));
+
+ ByteBuffer walBuf = ByteBuffer.wrap(Files.readAllBytes(segs.get(0).toPath()));
+ ByteBuffer cdcBuf = ByteBuffer.wrap(cdcMgr.buf.array());
+ walBuf.limit(firstSegActLen);
+ cdcBuf.limit(firstSegActLen);
+
+ assertEquals(0, walBuf.compareTo(cdcBuf));
+
+ walBuf = ByteBuffer.wrap(Files.readAllBytes(segs.get(1).toPath()));
+ cdcBuf = ByteBuffer.wrap(cdcMgr.buf.array());
+ cdcBuf.position(firstSegActLen + 1);
+ cdcBuf.limit(firstSegActLen + 1 + WAL_SEG_SIZE);
+
+ assertEquals(0, walBuf.compareTo(cdcBuf));
+ }
+
+ /** */
+ private void rollSegment(RolloverType rollType) {
+ if (persistentEnabled)
+ dbMgr(ign).checkpointReadLock();
+
+ try {
+ walMgr.log(new CheckpointRecord(null), rollType);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ finally {
+ if (persistentEnabled)
+ dbMgr(ign).checkpointReadUnlock();
+ }
+ }
+
+ /** Get WAL directory. */
+ private File walDir(IgniteEx ign) throws Exception {
+ return new File(
+ U.resolveWorkDirectory(
+ ign.configuration().getWorkDirectory(),
+ DataStorageConfiguration.DFLT_WAL_PATH,
+ false),
+ ign.context().pdsFolderResolver().resolveFolders().folderName());
+ }
+
+ /** @return Length of the all written records in the specified segment. */
+ private int writtenLength(File walSegment) throws Exception {
+ IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log);
+
+ IgniteWalIteratorFactory.IteratorParametersBuilder params = new IgniteWalIteratorFactory.IteratorParametersBuilder()
+ .filesOrDirs(walSegment);
+
+ try (WALIterator walIt = factory.iterator(params)) {
+ while (walIt.hasNext())
+ walIt.next();
+
+ return walIt.lastRead().get().next().fileOffset();
+ }
+ }
+
+ /** */
+ private static class CdcManagerPluginProvider extends AbstractTestPluginProvider {
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return "CdcManagerPluginProvider";
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> @Nullable T createComponent(PluginContext ctx, Class<T> cls) {
+ if (CdcManager.class.equals(cls))
+ return (T)new TestCdcManager();
+
+ return null;
+ }
+ }
+
+ /** Test {@link CdcManager} for testing WAL fsync guarantees. */
+ protected static class TestCdcManager extends GridCacheSharedManagerAdapter implements CdcManager {
+ /** Buffer to store collected data. */
+ private final ByteBuffer buf;
+
+ /** */
+ TestCdcManager() {
+ buf = ByteBuffer.allocate(2 * WAL_SEG_SIZE);
+
+ Arrays.fill(buf.array(), (byte)0);
+
+ buf.position(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collect(ByteBuffer dataBuf) {
+ if (failCollect)
+ throw new RuntimeException();
+
+ if (log.isDebugEnabled())
+ log.debug("Collect data buffer [offset=" + dataBuf.position() + ", limit=" + dataBuf.limit() + ']');
+
+ buf.put(dataBuf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean enabled() {
+ return true;
+ }
+
+ /** @return CdcManager for specified Ignite node. */
+ static TestCdcManager cdcMgr(IgniteEx ign) {
+ return (TestCdcManager)ign.context().cache().context().cdc();
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
index 401b02f..41a957d 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
@@ -354,6 +354,10 @@
while (iter.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> rec = iter.next();
+ // Custom records could be written in WAL even if persistence isn't enabled.
+ if (rec.get2().type().purpose() == WALRecord.RecordPurpose.CUSTOM)
+ continue;
+
if (persistenceEnabled && (!(rec.get2() instanceof DataRecord)))
continue;
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
index 60ffcee..c228cb3 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/wal/record/RecordUtils.java
@@ -24,6 +24,8 @@
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerRecord;
+import org.apache.ignite.internal.pagemem.wal.record.CdcManagerStopRecord;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord;
@@ -92,6 +94,7 @@
import org.apache.ignite.internal.processors.cache.tree.DataInnerIO;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.transactions.TransactionState;
@@ -113,6 +116,8 @@
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REMOVE;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.BTREE_PAGE_REPLACE;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_DATA_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_RECORD;
+import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CDC_MANAGER_STOP_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CLUSTER_SNAPSHOT;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CONSISTENT_CUT;
@@ -267,6 +272,9 @@
put(CLUSTER_SNAPSHOT, RecordUtils::buildClusterSnapshotRecord);
put(INCREMENTAL_SNAPSHOT_START_RECORD, RecordUtils::buildIncrementalSnapshotStartRecord);
put(INCREMENTAL_SNAPSHOT_FINISH_RECORD, RecordUtils::buildIncrementalSnapshotFinishRecord);
+ put(CDC_MANAGER_RECORD, RecordUtils::buildCdcManagerStopRecord);
+ put(CDC_MANAGER_STOP_RECORD, RecordUtils::buildCdcManagerStopRecord);
+
}
/** */
@@ -649,4 +657,14 @@
return new IncrementalSnapshotFinishRecord(
UUID.randomUUID(), F.asSet(new GridCacheVersion()), F.asSet(new GridCacheVersion()));
}
+
+ /** **/
+ public static CdcManagerRecord buildCdcManagerRecord() {
+ return new CdcManagerRecord(new T2<>(new WALPointer(0, 0, 0), 0));
+ }
+
+ /** **/
+ public static CdcManagerStopRecord buildCdcManagerStopRecord() {
+ return new CdcManagerStopRecord();
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 2848898..7d2ea12 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -22,6 +22,8 @@
import java.util.List;
import org.apache.ignite.cdc.CdcCacheConfigOnRestartTest;
import org.apache.ignite.cdc.CdcCacheVersionTest;
+import org.apache.ignite.cdc.CdcIgniteNodeActiveModeTest;
+import org.apache.ignite.cdc.CdcManagerTest;
import org.apache.ignite.cdc.CdcNonDefaultWorkDirTest;
import org.apache.ignite.cdc.CdcSelfTest;
import org.apache.ignite.cdc.RestartWithWalForceArchiveTimeoutTest;
@@ -162,6 +164,8 @@
GridTestUtils.addTestIfNeeded(suite, WalForCdcTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CdcCacheConfigOnRestartTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CdcNonDefaultWorkDirTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CdcManagerTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CdcIgniteNodeActiveModeTest.class, ignoredTests);
// new style folders with generated consistent ID test
GridTestUtils.addTestIfNeeded(suite, IgniteUidAsConsistentIdMigrationTest.class, ignoredTests);