Merge branch 'master' of github.com:apache/ignite-extensions into IGNITE-17399__cdc_expiration
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index 01c35b4..41fab27 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -34,11 +34,15 @@
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED;
+
 /**
  * Contains logic to process {@link CdcEvent} and apply them to the provided by {@link #ignite()} cluster.
  */
@@ -124,8 +128,13 @@
                 else
                     val = new CacheObjectImpl(evt.value(), null);
 
-                updBatch.put(key, new GridCacheDrInfo(val,
-                    new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())));
+                GridCacheVersion ver = new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId());
+
+                GridCacheDrInfo drVal = currCache.configuration().getExpiryPolicyFactory() != null ?
+                    new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED, EXPIRE_TIME_CALCULATE)
+                    : new GridCacheDrInfo(val, ver);
+
+                updBatch.put(key, drVal);
             }
             else {
                 evtsApplied += applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 51505c5..d4517cd 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -25,9 +25,14 @@
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.IntStream;
+import javax.cache.configuration.Factory;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ExpiryPolicy;
 import javax.management.DynamicMBean;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
@@ -52,6 +57,7 @@
 import org.apache.ignite.spi.metric.LongMetric;
 import org.apache.ignite.spi.metric.ObjectMetric;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -428,6 +434,36 @@
         }
     }
 
+    /** Test that destination cluster applies expiration policy on received entries. */
+    @Test
+    public void testWithExpiryPolicy() throws Exception {
+        Factory<? extends ExpiryPolicy> factory = () -> new CreatedExpiryPolicy(new Duration(TimeUnit.SECONDS, 10));
+
+        IgniteCache<Integer, ConflictResolvableTestData> srcCache = createCache(srcCluster[0], ACTIVE_PASSIVE_CACHE, factory);
+        IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_PASSIVE_CACHE, factory);
+
+        List<IgniteInternalFuture<?>> futs = startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);
+
+        try {
+            srcCache.putAll(F.asMap(0, ConflictResolvableTestData.create()));
+
+            assertTrue(srcCache.containsKey(0));
+
+            log.warning(">>>>>> Waiting for entry in destination cache");
+            assertTrue(waitForCondition(() -> destCache.containsKey(0), getTestTimeout()));
+
+            log.warning(">>>>>> Waiting for removing in source cache");
+            assertTrue(waitForCondition(() -> !srcCache.containsKey(0), getTestTimeout()));
+
+            log.warning(">>>>>> Waiting for removing in destination cache");
+            assertTrue(waitForCondition(() -> !destCache.containsKey(0), 20_000));
+        }
+        finally {
+            for (IgniteInternalFuture<?> fut : futs)
+                fut.cancel();
+        }
+    }
+
     /** */
     public Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
         return () -> {
@@ -480,11 +516,23 @@
 
     /** */
     private IgniteCache<Integer, ConflictResolvableTestData> createCache(IgniteEx ignite, String name) {
+        return createCache(ignite, name, null);
+    }
+
+    /** */
+    private IgniteCache<Integer, ConflictResolvableTestData> createCache(
+        IgniteEx ignite,
+        String name,
+        @Nullable Factory<? extends ExpiryPolicy> expiryPlcFactory
+    ) {
         CacheConfiguration<Integer, ConflictResolvableTestData> ccfg = new CacheConfiguration<Integer, ConflictResolvableTestData>()
             .setName(name)
             .setCacheMode(mode)
             .setAtomicityMode(atomicity);
 
+        if (expiryPlcFactory != null)
+            ccfg.setExpiryPolicyFactory(expiryPlcFactory);
+
         if (mode != REPLICATED)
             ccfg.setBackups(backups);