IGNITE-16871 Code review fixes.
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index fee991b..d821e2e 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -184,61 +184,66 @@
return true;
});
- while (filtered.hasNext()) {
- sendLimited(
- filtered,
- evt -> new ProducerRecord<>(
- evtTopic,
- evt.partition() % kafkaParts,
- evt.cacheId(),
- IgniteUtils.toBytes(evt)
- ),
- evtsCnt
- );
- }
+ sendAll(
+ filtered,
+ evt -> new ProducerRecord<>(
+ evtTopic,
+ evt.partition() % kafkaParts,
+ evt.cacheId(),
+ IgniteUtils.toBytes(evt)
+ ),
+ evtsCnt
+ );
return true;
}
/** {@inheritDoc} */
@Override public void onTypes(Iterator<BinaryType> types) {
- while (types.hasNext()) {
- sendLimited(
- types,
- t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
- typesCnt
- );
- }
+ sendAll(
+ types,
+ t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
+ typesCnt
+ );
sendMetaUpdatedMarkers();
}
/** {@inheritDoc} */
@Override public void onMappings(Iterator<TypeMapping> mappings) {
- while (mappings.hasNext()) {
- sendLimited(
- mappings,
- m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
- mappingsCnt
- );
- }
+ sendAll(
+ mappings,
+ m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
+ mappingsCnt
+ );
sendMetaUpdatedMarkers();
}
/** Send marker(meta need to be updated) record to each partition of events topic. */
private void sendMetaUpdatedMarkers() {
- Iterator<Integer> parts = IntStream.range(0, kafkaParts).iterator();
-
- while (parts.hasNext())
- sendLimited(parts, p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER), evtsCnt);
+ sendAll(
+ IntStream.range(0, kafkaParts).iterator(),
+ p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER),
+ evtsCnt
+ );
if (log.isDebugEnabled())
log.debug("Meta update markers sent.");
}
- /** Send limited amount of data to Kafka. */
- private <T> void sendLimited(
+ /** Send all data to Kafka. */
+ private <T> void sendAll(
+ Iterator<T> data,
+ Function<T, ProducerRecord<Integer, byte[]>> toRec,
+ AtomicLongMetric cntr
+ ) {
+ while (data.hasNext())
+ sendOneBatch(data, toRec, cntr);
+ }
+
+ /** Send one batch. */
+ private <T> void sendOneBatch(
Iterator<T> data,
Function<T, ProducerRecord<Integer, byte[]>> toRec,
AtomicLongMetric cntr
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index 3ed2650..acb2aeb 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -22,6 +22,7 @@
import java.io.ObjectInputStream;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -236,7 +237,7 @@
private boolean filterAndPossiblyUpdateMetadata(ConsumerRecord<Integer, byte[]> rec) {
byte[] val = rec.value();
- if (val.length == META_UPDATE_MARKER.length && U.bytesEqual(val, 0, META_UPDATE_MARKER, 0, val.length)) {
+ if (rec.key() == null && Arrays.equals(val, META_UPDATE_MARKER)) {
metaUpdr.updateMetadata();
return false;