Merge branch 'master' of github.com:apache/ignite-extensions into IGNITE-17399__cdc_expiration
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index 01c35b4..41fab27 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -34,11 +34,15 @@
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED;
+
/**
* Contains logic to process {@link CdcEvent} and apply them to the provided by {@link #ignite()} cluster.
*/
@@ -124,8 +128,13 @@
else
val = new CacheObjectImpl(evt.value(), null);
- updBatch.put(key, new GridCacheDrInfo(val,
- new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())));
+ GridCacheVersion ver = new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId());
+
+ GridCacheDrInfo drVal = currCache.configuration().getExpiryPolicyFactory() != null ?
+ new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED, EXPIRE_TIME_CALCULATE)
+ : new GridCacheDrInfo(val, ver);
+
+ updBatch.put(key, drVal);
}
else {
evtsApplied += applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 51505c5..d4517cd 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -25,9 +25,14 @@
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.IntStream;
+import javax.cache.configuration.Factory;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
import javax.management.DynamicMBean;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
@@ -52,6 +57,7 @@
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.spi.metric.ObjectMetric;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -428,6 +434,36 @@
}
}
+ /** Test that destination cluster applies expiration policy on received entries. */
+ @Test
+ public void testWithExpiryPolicy() throws Exception {
+ Factory<? extends ExpiryPolicy> factory = () -> new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 10));
+
+ IgniteCache<Integer, ConflictResolvableTestData> srcCache = createCache(srcCluster[0], ACTIVE_PASSIVE_CACHE, factory);
+ IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_PASSIVE_CACHE, factory);
+
+ List<IgniteInternalFuture<?>> futs = startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);
+
+ try {
+ srcCache.putAll(F.asMap(0, ConflictResolvableTestData.create()));
+
+ assertTrue(srcCache.containsKey(0));
+
+ log.warning(">>>>>> Waiting for entry in destination cache");
+ assertTrue(waitForCondition(() -> destCache.containsKey(0), getTestTimeout()));
+
+ log.warning(">>>>>> Waiting for removing in source cache");
+ assertTrue(waitForCondition(() -> !srcCache.containsKey(0), getTestTimeout()));
+
+ log.warning(">>>>>> Waiting for removing in destination cache");
+ assertTrue(waitForCondition(() -> !destCache.containsKey(0), 20_000));
+ }
+ finally {
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.cancel();
+ }
+ }
+
/** */
public Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
return () -> {
@@ -480,11 +516,23 @@
/** */
private IgniteCache<Integer, ConflictResolvableTestData> createCache(IgniteEx ignite, String name) {
+ return createCache(ignite, name, null);
+ }
+
+ /** */
+ private IgniteCache<Integer, ConflictResolvableTestData> createCache(
+ IgniteEx ignite,
+ String name,
+ @Nullable Factory<? extends ExpiryPolicy> expiryPlcFactory
+ ) {
CacheConfiguration<Integer, ConflictResolvableTestData> ccfg = new CacheConfiguration<Integer, ConflictResolvableTestData>()
.setName(name)
.setCacheMode(mode)
.setAtomicityMode(atomicity);
+ if (expiryPlcFactory != null)
+ ccfg.setExpiryPolicyFactory(expiryPlcFactory);
+
if (mode != REPLICATED)
ccfg.setBackups(backups);