IGNITE-16871 Metadata update initiated by marker records in event topic (#141)
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index 6160ead..01c35b4 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -24,10 +24,7 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.cache.CacheEntryVersion;
-import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration;
-import org.apache.ignite.cdc.kafka.KafkaToIgniteMetadataUpdater;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
@@ -39,9 +36,7 @@
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
/**
@@ -67,34 +62,6 @@
private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);
/**
- * Update closure.
- * @see #applyWithRetry(IgniteInClosureX, IgniteInternalCache)
- */
- private final IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> updClo =
- new IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>>() {
- /** {@inheritDoc} */
- @Override public void applyx(
- IgniteInternalCache<BinaryObject, BinaryObject> cache
- ) throws IgniteCheckedException {
- cache.putAllConflict(updBatch);
- }
- };
-
- /**
- * Remove closure.
- * @see #applyWithRetry(IgniteInClosureX, IgniteInternalCache)
- */
- private final IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> rmvClo =
- new IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>>() {
- /** {@inheritDoc} */
- @Override public void applyx(
- IgniteInternalCache<BinaryObject, BinaryObject> cache
- ) throws IgniteCheckedException {
- cache.removeAllConflict(rmvBatch);
- }
- };
-
- /**
* @param maxBatchSize Maximum batch size.
*/
public CdcEventsApplier(int maxBatchSize) {
@@ -181,19 +148,20 @@
* @param applyUpd Apply update batch flag supplier.
* @param applyRmv Apply remove batch flag supplier.
* @return Number of applied events.
+ * @throws IgniteCheckedException In case of error.
*/
private int applyIf(
IgniteInternalCache<BinaryObject, BinaryObject> cache,
BooleanSupplier applyUpd,
BooleanSupplier applyRmv
- ) {
+ ) throws IgniteCheckedException {
int evtsApplied = 0;
if (applyUpd.getAsBoolean()) {
if (log().isDebugEnabled())
log().debug("Applying put batch [cache=" + cache.name() + ']');
- applyWithRetry(updClo, cache);
+ cache.putAllConflict(updBatch);
evtsApplied += updBatch.size();
@@ -204,7 +172,7 @@
if (log().isDebugEnabled())
log().debug("Applying remove batch [cache=" + cache.name() + ']');
- applyWithRetry(rmvClo, cache);
+ cache.removeAllConflict(rmvBatch);
evtsApplied += rmvBatch.size();
@@ -214,36 +182,6 @@
return evtsApplied;
}
- /**
- * Executes closure with retry logic.
- * Metadata update thread polls metadata asynchronously with {@link KafkaToIgniteCdcStreamerConfiguration#getMetaUpdateInterval()}
- * interval. This means metadata updates can be seen later than data updates. In this case {@link BinaryObjectException} can point
- * to absence of metadata. To overcome lack of metadata invoke {@link #updateMetadata()} and retry closure.
- *
- * @param clo Closure to apply.
- * @param cache Cache for closure.
- * @see KafkaToIgniteMetadataUpdater
- * @see KafkaToIgniteCdcStreamerConfiguration#getMetaUpdateInterval()
- */
- private void applyWithRetry(
- IgniteInClosureX<IgniteInternalCache<BinaryObject, BinaryObject>> clo,
- IgniteInternalCache<BinaryObject, BinaryObject> cache
- ) {
- try {
- clo.apply(cache);
- }
- catch (Exception e) {
- // Retry only if cause is BinaryObjectException.
- if (!X.hasCause(e, BinaryObjectException.class))
- throw e;
-
- // Retry after metadata update.
- updateMetadata();
-
- clo.apply(cache);
- }
- }
-
/** @return {@code True} if update batch should be applied. */
private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject key) {
return map.size() >= maxBatchSize || map.containsKey(key);
@@ -296,11 +234,6 @@
}
}
- /** Update metadata if possible. */
- protected void updateMetadata() {
- // No-op.
- }
-
/** @return Ignite instance. */
protected abstract IgniteEx ignite();
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index 9c1b9c5..d821e2e 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -29,6 +29,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cdc.CdcConsumer;
@@ -43,6 +44,7 @@
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.resources.LoggerResource;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -91,6 +93,9 @@
/** Bytes sent metric description. */
public static final String BYTES_SENT_DESCRIPTION = "Count of bytes sent.";
+ /** Metadata updater marker. */
+ public static final byte[] META_UPDATE_MARKER = U.longToBytes(0xC0FF1E);
+
/** Log. */
@LoggerResource
private IgniteLogger log;
@@ -140,6 +145,9 @@
/** Count of sent mappings. */
protected AtomicLongMetric mappingsCnt;
+ /** Count of metadata updates. */
+ protected byte metaUpdCnt = 0;
+
/** */
private List<Future<RecordMetadata>> futs;
@@ -176,46 +184,66 @@
return true;
});
- while (filtered.hasNext()) {
- sendLimited(
- filtered,
- evt -> new ProducerRecord<>(
- evtTopic,
- evt.partition() % kafkaParts,
- evt.cacheId(),
- IgniteUtils.toBytes(evt)
- ),
- evtsCnt
- );
- }
+ sendAll(
+ filtered,
+ evt -> new ProducerRecord<>(
+ evtTopic,
+ evt.partition() % kafkaParts,
+ evt.cacheId(),
+ IgniteUtils.toBytes(evt)
+ ),
+ evtsCnt
+ );
return true;
}
/** {@inheritDoc} */
@Override public void onTypes(Iterator<BinaryType> types) {
- while (types.hasNext()) {
- sendLimited(
- types,
- t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
- typesCnt
- );
- }
+ sendAll(
+ types,
+ t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
+ typesCnt
+ );
+
+ sendMetaUpdatedMarkers();
}
/** {@inheritDoc} */
@Override public void onMappings(Iterator<TypeMapping> mappings) {
- while (mappings.hasNext()) {
- sendLimited(
- mappings,
- m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
- mappingsCnt
- );
- }
+ sendAll(
+ mappings,
+ m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
+ mappingsCnt
+ );
+
+ sendMetaUpdatedMarkers();
}
- /** Send limited amount of data to Kafka. */
- private <T> void sendLimited(
+ /** Send marker(meta need to be updated) record to each partition of events topic. */
+ private void sendMetaUpdatedMarkers() {
+ sendAll(
+ IntStream.range(0, kafkaParts).iterator(),
+ p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER),
+ evtsCnt
+ );
+
+ if (log.isDebugEnabled())
+ log.debug("Meta update markers sent.");
+ }
+
+ /** Send all data to Kafka. */
+ private <T> void sendAll(
+ Iterator<T> data,
+ Function<T, ProducerRecord<Integer, byte[]>> toRec,
+ AtomicLongMetric cntr
+ ) {
+ while (data.hasNext())
+ sendOneBatch(data, toRec, cntr);
+ }
+
+ /** Send one batch. */
+ private <T> void sendOneBatch(
Iterator<T> data,
Function<T, ProducerRecord<Integer, byte[]>> toRec,
AtomicLongMetric cntr
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
index 63f1df0..b0a5e4b 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
@@ -133,9 +133,8 @@
this.kafkaProps = kafkaProps;
this.streamerCfg = streamerCfg;
- // Extra thread for metadata updater.
- appliers = new ArrayList<>(streamerCfg.getThreadCount() + 1);
- runners = new ArrayList<>(streamerCfg.getThreadCount() + 1);
+ appliers = new ArrayList<>(streamerCfg.getThreadCount());
+ runners = new ArrayList<>(streamerCfg.getThreadCount());
if (!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
throw new IllegalArgumentException("Kafka properties don't contains " + ConsumerConfig.GROUP_ID_CONFIG);
@@ -179,12 +178,9 @@
ign,
log,
kafkaProps,
- streamerCfg,
- stopped
+ streamerCfg
);
- addAndStart("meta-update-thread", metaUpdr);
-
int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
int threadCnt = streamerCfg.getThreadCount();
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index 5de8cc3..acb2aeb 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -22,6 +22,7 @@
import java.io.ObjectInputStream;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -49,6 +50,8 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
+import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.META_UPDATE_MARKER;
+
/**
* Thread that polls message from the Kafka topic partitions and applies those messages to the Ignite caches.
* It expected that messages was written to the Kafka by the {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
@@ -212,18 +215,38 @@
private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
- if (log.isDebugEnabled()) {
- log.debug(
- "Polled from consumer [assignments=" + cnsmr.assignment() + ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
+ if (log.isInfoEnabled()) {
+ log.info(
+ "Polled from consumer [assignments=" + cnsmr.assignment() +
+ ", cnt=" + recs.count() +
+ ", rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
);
}
- apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key())));
+ apply(F.iterator(recs, this::deserialize, true, this::filterAndPossiblyUpdateMetadata));
cnsmr.commitSync(Duration.ofMillis(kafkaReqTimeout));
}
/**
+ * Filter out {@link CdcEvent} records.
+ * Updates metadata in case update metadata marker found.
+ * @param rec Record to filter.
+ * @return {@code True} if record should be pushed down.
+ */
+ private boolean filterAndPossiblyUpdateMetadata(ConsumerRecord<Integer, byte[]> rec) {
+ byte[] val = rec.value();
+
+ if (rec.key() == null && Arrays.equals(val, META_UPDATE_MARKER)) {
+ metaUpdr.updateMetadata();
+
+ return false;
+ }
+
+ return F.isEmpty(caches) || caches.contains(rec.key());
+ }
+
+ /**
* @param rec Kafka record.
* @return CDC event.
*/
@@ -237,14 +260,11 @@
}
/** {@inheritDoc} */
- @Override protected void updateMetadata() {
- metaUpdr.updateMetadata();
- }
-
- /** {@inheritDoc} */
@Override public void close() {
log.warning("Close applier!");
+ metaUpdr.close();
+
cnsmrs.forEach(KafkaConsumer::wakeup);
}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
index 7811127..24675d4 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
@@ -33,9 +33,6 @@
/** Default maximum time to complete Kafka related requests, in milliseconds. */
public static final long DFLT_KAFKA_REQ_TIMEOUT = 3_000L;
- /** Default maximum time to complete Kafka related requests, in milliseconds. */
- public static final long DFLT_META_UPD_INTERVAL = 3_000L;
-
/** Default {@link #threadCnt} value. */
public static final int DFLT_THREAD_CNT = 16;
@@ -60,9 +57,6 @@
/** The maximum time to complete Kafka related requests, in milliseconds. */
private long kafkaReqTimeout = DFLT_KAFKA_REQ_TIMEOUT;
- /** Amount of time between two polling of {@link #metadataTopic}, in milliseconds. */
- private long metaUpdInterval = DFLT_META_UPD_INTERVAL;
-
/** Metadata consumer group. */
private String metadataCnsmrGrp;
@@ -194,22 +188,6 @@
}
/**
- * @return Amount of time between two polling of {@link #metadataTopic}.
- */
- public long getMetaUpdateInterval() {
- return metaUpdInterval;
- }
-
- /**
- * Sets amount of time between two polling of {@link #metadataTopic}.
- *
- * @param metaUpdateInterval Amount of time between two polling of {@link #metadataTopic}.
- */
- public void setMetaUpdateInterval(long metaUpdateInterval) {
- this.metaUpdInterval = metaUpdateInterval;
- }
-
- /**
* @return Consumer group to read metadata topic.
*/
public String getMetadataConsumerGroup() {
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
index 852c991..c14097b 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
@@ -20,7 +20,6 @@
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cdc.CdcEventsApplier;
@@ -29,7 +28,6 @@
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -41,22 +39,16 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
/** */
-public class KafkaToIgniteMetadataUpdater implements AutoCloseable, Runnable {
+public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
/** Ignite instance. */
private final IgniteEx ign;
/** Log. */
private final IgniteLogger log;
- /** Closed flag. Shared between all appliers. */
- private final AtomicBoolean stopped;
-
/** The maximum time to complete Kafka related requests, in milliseconds. */
private final long kafkaReqTimeout;
- /** The maximum time to complete Kafka related requests, in milliseconds. */
- private final long metaUpdInterval;
-
/** */
private final KafkaConsumer<Void, byte[]> cnsmr;
@@ -68,19 +60,15 @@
* @param log Logger.
* @param initProps Kafka properties.
* @param streamerCfg Streamer configuration.
- * @param stopped Stopped flag.
*/
public KafkaToIgniteMetadataUpdater(
IgniteEx ign,
IgniteLogger log,
Properties initProps,
- KafkaToIgniteCdcStreamerConfiguration streamerCfg,
- AtomicBoolean stopped
+ KafkaToIgniteCdcStreamerConfiguration streamerCfg
) {
this.ign = ign;
this.kafkaReqTimeout = streamerCfg.getKafkaRequestTimeout();
- this.metaUpdInterval = streamerCfg.getMetaUpdateInterval();
- this.stopped = stopped;
this.log = log.getLogger(KafkaToIgniteMetadataUpdater.class);
Properties kafkaProps = new Properties();
@@ -97,22 +85,6 @@
cnsmr.subscribe(Collections.singletonList(streamerCfg.getMetadataTopic()));
}
- /** {@inheritDoc} */
- @Override public void run() {
- U.setCurrentIgniteName(ign.name());
-
- while (!stopped.get()) {
- updateMetadata();
-
- try {
- Thread.sleep(metaUpdInterval);
- }
- catch (InterruptedException e) {
- // Ignore.
- }
- }
- }
-
/** Polls all available records from metadata topic and applies it to Ignite. */
public synchronized void updateMetadata() {
while (true) {
@@ -141,8 +113,6 @@
/** {@inheritDoc} */
@Override public void close() {
- log.warning("Close applier!");
-
cnsmr.wakeup();
}