IGNITE-17668 Fix removing archive WAL segments for in-memory CDC mode (#10248)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 445e106..bd402a9 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2951,7 +2951,7 @@
* @param highBound Upper bound.
* @throws IgniteCheckedException If failed.
*/
- public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException {
+ @Override public void onWalTruncated(@Nullable WALPointer highBound) throws IgniteCheckedException {
checkpointManager.removeCheckpointsUntil(highBound);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
index 8cf730a..c789141 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java
@@ -1156,17 +1156,18 @@
if (!CU.isCdcEnabled(kctx.config()) || kctx.clientNode())
return;
- WALIterator iter = cctx.wal(true).replay(null, (type, ptr) -> true);
+ try (WALIterator iter = cctx.wal(true).replay(null, (type, ptr) -> true)) {
+ while (iter.hasNext())
+ iter.next();
- while (iter.hasNext())
- iter.next();
+ WALPointer ptr = iter.lastRead().orElse(null);
- WALPointer ptr = iter.lastRead().orElse(null);
+ if (ptr != null)
+ ptr = ptr.next();
- if (ptr != null)
- ptr = ptr.next();
-
- cctx.wal(true).resumeLogging(ptr);
+ cctx.wal(true).startAutoReleaseSegments();
+ cctx.wal(true).resumeLogging(ptr);
+ }
}
/**
@@ -1750,4 +1751,14 @@
(warmUpConfig) -> "Unknown data region warm-up configuration: " + errPostfix.get()
);
}
+
+ /**
+ * Wal truncate callback.
+ *
+ * @param highBound Upper bound.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException {
+ // No-op.
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index b4cad82..8688712 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -56,6 +56,7 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.DiskPageCompression;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -81,8 +82,8 @@
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
@@ -341,6 +342,13 @@
private final long walForceArchiveTimeout;
/**
+ * {@code True} if WAL enabled only for CDC.
+ * This mean {@link DataRegionConfiguration#isPersistenceEnabled()} is {@code false} for all {@link DataRegion},
+ * and {@link DataRegionConfiguration#isCdcEnabled()} {@code true} for some of them.
+ */
+ private final boolean inMemoryCdc;
+
+ /**
* Container with last WAL record logged timestamp.<br> Zero value means there was no records logged to current
* segment, skip possible archiving for this case<br> Value is filled only for case {@link
* #walAutoArchiveAfterInactivity} > 0<br>
@@ -423,6 +431,7 @@
segmentFileInputFactory = new SimpleSegmentFileInputFactory();
walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
walForceArchiveTimeout = dsCfg.getWalForceArchiveTimeout();
+ inMemoryCdc = !CU.isPersistenceEnabled(dsCfg) && CU.isCdcEnabled(igCfg);
timeoutRolloverMux = (walAutoArchiveAfterInactivity > 0 || walForceArchiveTimeout > 0) ? new Object() : null;
@@ -1374,6 +1383,11 @@
segmentSize.put(idx, currSize);
}
finally {
+ // Move checkpoint pointer to the edge as node don't have actual checkpoints in `inMemoryCdc=true` mode.
+ // This will allow cleaner to remove segments from archive.
+ if (inMemoryCdc)
+ notchLastCheckpointPtr(hnd.position());
+
if (archiver == null)
segmentAware.addSize(idx, currSize - reservedSize);
}
@@ -3294,7 +3308,7 @@
+ ", maxSize=" + U.humanReadableByteCount(maxWalArchiveSize) + ']');
}
- ((GridCacheDatabaseSharedManager)cctx.database()).onWalTruncated(highPtr);
+ cctx.database().onWalTruncated(highPtr);
int truncated = truncate(highPtr);
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
index 953f888..9f39ede 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/WalForCdcTest.java
@@ -17,12 +17,15 @@
package org.apache.ignite.cdc;
+import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
+import java.util.function.IntConsumer;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
@@ -33,10 +36,12 @@
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
@@ -53,6 +58,9 @@
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cdc.CdcSelfTest.WAL_ARCHIVE_TIMEOUT;
+import static org.apache.ignite.configuration.DataStorageConfiguration.UNLIMITED_WAL_ARCHIVE;
+import static org.apache.ignite.internal.util.IgniteUtils.KB;
+import static org.apache.ignite.internal.util.IgniteUtils.MB;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -77,6 +85,9 @@
private boolean cdcEnabled;
/** */
+ private long archiveSz = UNLIMITED_WAL_ARCHIVE;
+
+ /** */
@Parameterized.Parameters(name = "mode={0}, atomicityMode={1}")
public static Collection<?> parameters() {
List<Object[]> params = new ArrayList<>();
@@ -94,6 +105,8 @@
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setWalForceArchiveTimeout(WAL_ARCHIVE_TIMEOUT)
+ .setWalSegmentSize((int)(2 * MB))
+ .setMaxWalArchiveSize(archiveSz)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(persistenceEnabled)
.setCdcEnabled(cdcEnabled)));
@@ -197,6 +210,60 @@
}
/** */
+ @Test
+ public void testArchiveCleared() throws Exception {
+ persistenceEnabled = false;
+ cdcEnabled = true;
+ archiveSz = 10 * MB;
+
+ IgniteEx ignite = startGrid(0);
+
+ ignite.cluster().state(ClusterState.ACTIVE);
+
+ IgniteCache<Integer, byte[]> cache = ignite.getOrCreateCache(
+ new CacheConfiguration<Integer, byte[]>(DEFAULT_CACHE_NAME)
+ .setCacheMode(mode)
+ .setAtomicityMode(atomicityMode));
+
+ IntConsumer createData = (entryCnt) -> {
+ for (int i = 0; i < entryCnt; i++) {
+ byte[] payload = new byte[(int)KB];
+
+ ThreadLocalRandom.current().nextBytes(payload);
+
+ cache.put(i, payload);
+ }
+ };
+
+ IgniteWriteAheadLogManager wal = ignite.context().cache().context().wal(true);
+
+ long startSgmnt = wal.currentSegment();
+
+ createData.accept((int)(archiveSz / (2 * KB)));
+
+ long finishSgmnt = wal.currentSegment();
+
+ String archive = archive(ignite);
+
+ assertTrue(finishSgmnt > startSgmnt);
+ assertTrue(
+ "Wait for start segment archivation",
+ waitForCondition(() -> startSgmnt <= wal.lastArchivedSegment(), getTestTimeout())
+ );
+
+ File startSgmntArchived = new File(archive, FileDescriptor.fileName(startSgmnt));
+
+ assertTrue("Check archived segment file exists", startSgmntArchived.exists());
+
+ createData.accept((int)(archiveSz / KB));
+
+ assertTrue(
+ "Wait for archived segment cleaned",
+ waitForCondition(() -> !startSgmntArchived.exists(), getTestTimeout())
+ );
+ }
+
+ /** */
private void doTestWal(
IgniteEx ignite,
Consumer<IgniteCache<Integer, Integer>> putData,
@@ -223,16 +290,9 @@
/** */
private int checkDataRecords(IgniteEx ignite) throws IgniteCheckedException {
- String archive = U.resolveWorkDirectory(
- U.defaultWorkDirectory(),
- ignite.configuration().getDataStorageConfiguration().getWalArchivePath() + "/" +
- U.maskForFileName(ignite.configuration().getIgniteInstanceName()),
- false
- ).getAbsolutePath();
-
WALIterator iter = new IgniteWalIteratorFactory(log).iterator(new IteratorParametersBuilder()
.ioFactory(new RandomAccessFileIOFactory())
- .filesOrDirs(archive));
+ .filesOrDirs(archive(ignite)));
int walRecCnt = 0;
@@ -254,4 +314,18 @@
return walRecCnt;
}
+
+ /**
+ * @param ignite Ignite.
+ * @return WAL archive patch
+ * @throws IgniteCheckedException If failed
+ */
+ private static String archive(IgniteEx ignite) throws IgniteCheckedException {
+ return U.resolveWorkDirectory(
+ U.defaultWorkDirectory(),
+ ignite.configuration().getDataStorageConfiguration().getWalArchivePath() + "/" +
+ U.maskForFileName(ignite.configuration().getIgniteInstanceName()),
+ false
+ ).getAbsolutePath();
+ }
}