IGNITE-16871 Metadata update initiated by marker records in event topic (#141)

diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index 6160ead..01c35b4 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -24,10 +24,7 @@
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.cache.CacheEntryVersion;
-import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration;
-import org.apache.ignite.cdc.kafka.KafkaToIgniteMetadataUpdater;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.binary.BinaryMetadata;
 import org.apache.ignite.internal.binary.BinaryTypeImpl;
@@ -39,9 +36,7 @@
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.lang.IgniteInClosureX;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 
 /**
@@ -67,34 +62,6 @@
     private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);
 
     /**
-     * Update closure.
-     * @see #applyWithRetry(IgniteInClosureX, IgniteInternalCache)
-     */
-    private final IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> updClo =
-        new IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>>() {
-            /** {@inheritDoc} */
-            @Override public void applyx(
-                IgniteInternalCache<BinaryObject, BinaryObject> cache
-            ) throws IgniteCheckedException {
-                cache.putAllConflict(updBatch);
-            }
-        };
-
-    /**
-     * Remove closure.
-     * @see #applyWithRetry(IgniteInClosureX, IgniteInternalCache)
-     */
-    private final IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> rmvClo =
-        new IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>>() {
-            /** {@inheritDoc} */
-            @Override public void applyx(
-                IgniteInternalCache<BinaryObject, BinaryObject> cache
-            ) throws IgniteCheckedException {
-                cache.removeAllConflict(rmvBatch);
-            }
-        };
-
-    /**
      * @param maxBatchSize Maximum batch size.
      */
     public CdcEventsApplier(int maxBatchSize) {
@@ -181,19 +148,20 @@
      * @param applyUpd Apply update batch flag supplier.
      * @param applyRmv Apply remove batch flag supplier.
      * @return Number of applied events.
+     * @throws IgniteCheckedException In case of error.
      */
     private int applyIf(
         IgniteInternalCache<BinaryObject, BinaryObject> cache,
         BooleanSupplier applyUpd,
         BooleanSupplier applyRmv
-    ) {
+    ) throws IgniteCheckedException {
         int evtsApplied = 0;
 
         if (applyUpd.getAsBoolean()) {
             if (log().isDebugEnabled())
                 log().debug("Applying put batch [cache=" + cache.name() + ']');
 
-            applyWithRetry(updClo, cache);
+            cache.putAllConflict(updBatch);
 
             evtsApplied += updBatch.size();
 
@@ -204,7 +172,7 @@
             if (log().isDebugEnabled())
                 log().debug("Applying remove batch [cache=" + cache.name() + ']');
 
-            applyWithRetry(rmvClo, cache);
+            cache.removeAllConflict(rmvBatch);
 
             evtsApplied += rmvBatch.size();
 
@@ -214,36 +182,6 @@
         return evtsApplied;
     }
 
-    /**
-     * Executes closure with retry logic.
-     * Metadata update thread polls metadata asynchronously with {@link KafkaToIgniteCdcStreamerConfiguration#getMetaUpdateInterval()}
-     * interval. This means metadata updates can be seen later than data updates. In this case {@link BinaryObjectException} can point
-     * to absence of metadata. To overcome lack of metadata invoke {@link #updateMetadata()} and retry closure.
-     *
-     * @param clo Closure to apply.
-     * @param cache Cache for closure.
-     * @see KafkaToIgniteMetadataUpdater
-     * @see KafkaToIgniteCdcStreamerConfiguration#getMetaUpdateInterval()
-     */
-    private void applyWithRetry(
-        IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> clo,
-        IgniteInternalCache<BinaryObject, BinaryObject> cache
-    ) {
-        try {
-            clo.apply(cache);
-        }
-        catch (Exception e) {
-            // Retry only if cause is BinaryObjectException.
-            if (!X.hasCause(e, BinaryObjectException.class))
-                throw e;
-
-            // Retry after metadata update.
-            updateMetadata();
-
-            clo.apply(cache);
-        }
-    }
-
     /** @return {@code True} if update batch should be applied. */
     private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject key) {
         return map.size() >= maxBatchSize || map.containsKey(key);
@@ -296,11 +234,6 @@
         }
     }
 
-    /** Update metadata if possible. */
-    protected void updateMetadata() {
-        // No-op.
-    }
-
     /** @return Ignite instance. */
     protected abstract IgniteEx ignite();
 
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index 9c1b9c5..d821e2e 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -29,6 +29,7 @@
 import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.cdc.CdcConsumer;
@@ -43,6 +44,7 @@
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.ignite.resources.LoggerResource;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -91,6 +93,9 @@
     /** Bytes sent metric description. */
     public static final String BYTES_SENT_DESCRIPTION = "Count of bytes sent.";
 
+    /** Metadata updater marker. */
+    public static final byte[] META_UPDATE_MARKER = U.longToBytes(0xC0FF1E);
+
     /** Log. */
     @LoggerResource
     private IgniteLogger log;
@@ -140,6 +145,9 @@
     /** Count of sent mappings. */
     protected AtomicLongMetric mappingsCnt;
 
+    /** Count of metadata updates. */
+    protected byte metaUpdCnt = 0;
+
     /** */
     private List<Future<RecordMetadata>> futs;
 
@@ -176,46 +184,66 @@
             return true;
         });
 
-        while (filtered.hasNext()) {
-            sendLimited(
-                filtered,
-                evt -> new ProducerRecord<>(
-                    evtTopic,
-                    evt.partition() % kafkaParts,
-                    evt.cacheId(),
-                    IgniteUtils.toBytes(evt)
-                ),
-                evtsCnt
-            );
-        }
+        sendAll(
+            filtered,
+            evt -> new ProducerRecord<>(
+                evtTopic,
+                evt.partition() % kafkaParts,
+                evt.cacheId(),
+                IgniteUtils.toBytes(evt)
+            ),
+            evtsCnt
+        );
 
         return true;
     }
 
     /** {@inheritDoc} */
     @Override public void onTypes(Iterator<BinaryType> types) {
-        while (types.hasNext()) {
-            sendLimited(
-                types,
-                t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
-                typesCnt
-            );
-        }
+        sendAll(
+            types,
+            t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
+            typesCnt
+        );
+
+        sendMetaUpdatedMarkers();
     }
 
     /** {@inheritDoc} */
     @Override public void onMappings(Iterator<TypeMapping> mappings) {
-        while (mappings.hasNext()) {
-            sendLimited(
-                mappings,
-                m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
-                mappingsCnt
-            );
-        }
+        sendAll(
+            mappings,
+            m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
+            mappingsCnt
+        );
+
+        sendMetaUpdatedMarkers();
     }
 
-    /** Send limited amount of data to Kafka. */
-    private <T> void sendLimited(
+    /** Send marker(meta need to be updated) record to each partition of events topic. */
+    private void sendMetaUpdatedMarkers() {
+        sendAll(
+            IntStream.range(0, kafkaParts).iterator(),
+            p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER),
+            evtsCnt
+        );
+
+        if (log.isDebugEnabled())
+            log.debug("Meta update markers sent.");
+    }
+
+    /** Send all data to Kafka. */
+    private <T> void sendAll(
+        Iterator<T> data,
+        Function<T, ProducerRecord<Integer, byte[]>> toRec,
+        AtomicLongMetric cntr
+    ) {
+        while (data.hasNext())
+            sendOneBatch(data, toRec, cntr);
+    }
+
+    /** Send one batch. */
+    private <T> void sendOneBatch(
         Iterator<T> data,
         Function<T, ProducerRecord<Integer, byte[]>> toRec,
         AtomicLongMetric cntr
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
index 63f1df0..b0a5e4b 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
@@ -133,9 +133,8 @@
         this.kafkaProps = kafkaProps;
         this.streamerCfg = streamerCfg;
 
-        // Extra thread for metadata updater.
-        appliers = new ArrayList<>(streamerCfg.getThreadCount() + 1);
-        runners = new ArrayList<>(streamerCfg.getThreadCount() + 1);
+        appliers = new ArrayList<>(streamerCfg.getThreadCount());
+        runners = new ArrayList<>(streamerCfg.getThreadCount());
 
         if (!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
             throw new IllegalArgumentException("Kafka properties don't contains " + ConsumerConfig.GROUP_ID_CONFIG);
@@ -179,12 +178,9 @@
                 ign,
                 log,
                 kafkaProps,
-                streamerCfg,
-                stopped
+                streamerCfg
             );
 
-            addAndStart("meta-update-thread", metaUpdr);
-
             int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
             int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
             int threadCnt = streamerCfg.getThreadCount();
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index 5de8cc3..acb2aeb 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -22,6 +22,7 @@
 import java.io.ObjectInputStream;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -49,6 +50,8 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
+import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.META_UPDATE_MARKER;
+
 /**
  * Thread that polls message from the Kafka topic partitions and applies those messages to the Ignite caches.
  * It expected that messages was written to the Kafka by the {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
@@ -212,18 +215,38 @@
     private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
         ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
 
-        if (log.isDebugEnabled()) {
-            log.debug(
-                "Polled from consumer [assignments=" + cnsmr.assignment() + ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
+        if (log.isInfoEnabled()) {
+            log.info(
+                "Polled from consumer [assignments=" + cnsmr.assignment() +
+                    ", cnt=" + recs.count() +
+                    ", rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
             );
         }
 
-        apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key())));
+        apply(F.iterator(recs, this::deserialize, true, this::filterAndPossiblyUpdateMetadata));
 
         cnsmr.commitSync(Duration.ofMillis(kafkaReqTimeout));
     }
 
     /**
+     * Filter out {@link CdcEvent} records.
+     * Updates metadata in case update metadata marker found.
+     * @param rec Record to filter.
+     * @return {@code True} if record should be pushed down.
+     */
+    private boolean filterAndPossiblyUpdateMetadata(ConsumerRecord<Integer, byte[]> rec) {
+        byte[] val = rec.value();
+
+        if (rec.key() == null && Arrays.equals(val, META_UPDATE_MARKER)) {
+            metaUpdr.updateMetadata();
+
+            return false;
+        }
+
+        return F.isEmpty(caches) || caches.contains(rec.key());
+    }
+
+    /**
      * @param rec Kafka record.
      * @return CDC event.
      */
@@ -237,14 +260,11 @@
     }
 
     /** {@inheritDoc} */
-    @Override protected void updateMetadata() {
-        metaUpdr.updateMetadata();
-    }
-
-    /** {@inheritDoc} */
     @Override public void close() {
         log.warning("Close applier!");
 
+        metaUpdr.close();
+
         cnsmrs.forEach(KafkaConsumer::wakeup);
     }
 
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
index 7811127..24675d4 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
@@ -33,9 +33,6 @@
     /** Default maximum time to complete Kafka related requests, in milliseconds. */
     public static final long DFLT_KAFKA_REQ_TIMEOUT = 3_000L;
 
-    /** Default maximum time to complete Kafka related requests, in milliseconds. */
-    public static final long DFLT_META_UPD_INTERVAL = 3_000L;
-
     /** Default {@link #threadCnt} value. */
     public static final int DFLT_THREAD_CNT = 16;
 
@@ -60,9 +57,6 @@
     /** The maximum time to complete Kafka related requests, in milliseconds. */
     private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT;
 
-    /** Amount of time between two polling of {@link #metadataTopic}, in milliseconds. */
-    private long metaUpdInterval = DFLT_META_UPD_INTERVAL;
-
     /** Metadata consumer group. */
     private String metadataCnsmrGrp;
 
@@ -194,22 +188,6 @@
     }
 
     /**
-     * @return Amount of time between two polling of {@link #metadataTopic}.
-     */
-    public long getMetaUpdateInterval() {
-        return metaUpdInterval;
-    }
-
-    /**
-     * Sets amount of time between two polling of {@link #metadataTopic}.
-     *
-     * @param metaUpdateInterval Amount of time between two polling of {@link #metadataTopic}.
-     */
-    public void setMetaUpdateInterval(long metaUpdateInterval) {
-        this.metaUpdInterval = metaUpdateInterval;
-    }
-
-    /**
      * @return Consumer group to read metadata topic.
      */
     public String getMetadataConsumerGroup() {
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
index 852c991..c14097b 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
@@ -20,7 +20,6 @@
 import java.time.Duration;
 import java.util.Collections;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cdc.CdcEventsApplier;
@@ -29,7 +28,6 @@
 import org.apache.ignite.internal.binary.BinaryMetadata;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -41,22 +39,16 @@
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 
 /** */
-public class KafkaToIgniteMetadataUpdater implements AutoCloseable, Runnable {
+public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
     /** Ignite instance. */
     private final IgniteEx ign;
 
     /** Log. */
     private final IgniteLogger log;
 
-    /** Closed flag. Shared between all appliers. */
-    private final AtomicBoolean stopped;
-
     /** The maximum time to complete Kafka related requests, in milliseconds. */
     private final long kafkaReqTimeout;
 
-    /** The maximum time to complete Kafka related requests, in milliseconds. */
-    private final long metaUpdInterval;
-
     /** */
     private final KafkaConsumer<Void, byte[]> cnsmr;
 
@@ -68,19 +60,15 @@
      * @param log Logger.
      * @param initProps Kafka properties.
      * @param streamerCfg Streamer configuration.
-     * @param stopped Stopped flag.
      */
     public KafkaToIgniteMetadataUpdater(
         IgniteEx ign,
         IgniteLogger log,
         Properties initProps,
-        KafkaToIgniteCdcStreamerConfiguration streamerCfg,
-        AtomicBoolean stopped
+        KafkaToIgniteCdcStreamerConfiguration streamerCfg
     ) {
         this.ign = ign;
         this.kafkaReqTimeout = streamerCfg.getKafkaRequestTimeout();
-        this.metaUpdInterval = streamerCfg.getMetaUpdateInterval();
-        this.stopped = stopped;
         this.log = log.getLogger(KafkaToIgniteMetadataUpdater.class);
 
         Properties kafkaProps = new Properties();
@@ -97,22 +85,6 @@
         cnsmr.subscribe(Collections.singletonList(streamerCfg.getMetadataTopic()));
     }
 
-    /** {@inheritDoc} */
-    @Override public void run() {
-        U.setCurrentIgniteName(ign.name());
-
-        while (!stopped.get()) {
-            updateMetadata();
-
-            try {
-                Thread.sleep(metaUpdInterval);
-            }
-            catch (InterruptedException e) {
-                // Ignore.
-            }
-        }
-    }
-
     /** Polls all available records from metadata topic and applies it to Ignite. */
     public synchronized void updateMetadata() {
         while (true) {
@@ -141,8 +113,6 @@
 
     /** {@inheritDoc} */
     @Override public void close() {
-        log.warning("Close applier!");
-
         cnsmr.wakeup();
     }