IGNITE-16743 Implement thin client CDC streamer (#169)
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
new file mode 100644
index 0000000..ffef911
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractCdcEventsApplier.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.F;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UNDEFINED_CACHE_ID;
+
+/**
+ * Contains logic to process {@link CdcEvent} and apply them to the cluster.
+ */
+public abstract class AbstractCdcEventsApplier<K, V> {
+ /** Maximum batch size. */
+ private final int maxBatchSize;
+
+ /** Update batch. */
+ private final Map<K, V> updBatch = new HashMap<>();
+
+ /** Remove batch. */
+ private final Map<K, GridCacheVersion> rmvBatch = new HashMap<>();
+
+ /** */
+ private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch);
+
+ /** */
+ private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);
+
+ /** */
+ private final IgniteLogger log;
+
+ /**
+ * @param maxBatchSize Maximum batch size.
+ * @param log Logger.
+ */
+ public AbstractCdcEventsApplier(int maxBatchSize, IgniteLogger log) {
+ this.maxBatchSize = maxBatchSize;
+ this.log = log.getLogger(getClass());
+ }
+
+ /**
+ * @param evts Events to process.
+ * @return Number of applied events.
+ * @throws IgniteCheckedException If failed.
+ */
+ public int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
+ int currCacheId = UNDEFINED_CACHE_ID;
+ int evtsApplied = 0;
+
+ for (CdcEvent evt : evts) {
+ if (log.isDebugEnabled())
+ log.debug("Event received [evt=" + evt + ']');
+
+ int cacheId = evt.cacheId();
+
+ if (cacheId != currCacheId) {
+ evtsApplied += applyIf(currCacheId, hasUpdates, hasRemoves);
+
+ currCacheId = cacheId;
+ }
+
+ CacheEntryVersion order = evt.version();
+ K key = toKey(evt);
+ GridCacheVersion ver = new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId());
+
+ if (evt.value() != null) {
+ evtsApplied += applyIf(currCacheId, () -> isApplyBatch(updBatch, key), hasRemoves);
+
+ updBatch.put(key, toValue(currCacheId, evt.value(), ver));
+ }
+ else {
+ evtsApplied += applyIf(currCacheId, hasUpdates, () -> isApplyBatch(rmvBatch, key));
+
+ rmvBatch.put(key, ver);
+ }
+ }
+
+ if (currCacheId != UNDEFINED_CACHE_ID)
+ evtsApplied += applyIf(currCacheId, hasUpdates, hasRemoves);
+
+ return evtsApplied;
+ }
+
+ /**
+ * Applies data from {@link #updBatch} or {@link #rmvBatch} to Ignite if required.
+ *
+ * @param cacheId Current cache ID.
+ * @param applyUpd Apply update batch flag supplier.
+ * @param applyRmv Apply remove batch flag supplier.
+ * @return Number of applied events.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private int applyIf(
+ int cacheId,
+ BooleanSupplier applyUpd,
+ BooleanSupplier applyRmv
+ ) throws IgniteCheckedException {
+ int evtsApplied = 0;
+
+ if (applyUpd.getAsBoolean()) {
+ if (log.isDebugEnabled())
+ log.debug("Applying put batch [cacheId=" + cacheId + ']');
+
+ putAllConflict(cacheId, updBatch);
+
+ evtsApplied += updBatch.size();
+
+ updBatch.clear();
+ }
+
+ if (applyRmv.getAsBoolean()) {
+ if (log.isDebugEnabled())
+ log.debug("Applying remove batch [cacheId=" + cacheId + ']');
+
+ removeAllConflict(cacheId, rmvBatch);
+
+ evtsApplied += rmvBatch.size();
+
+ rmvBatch.clear();
+ }
+
+ return evtsApplied;
+ }
+
+ /** @return {@code True} if update batch should be applied. */
+ private boolean isApplyBatch(Map<K, ?> map, K key) {
+ return map.size() >= maxBatchSize || map.containsKey(key);
+ }
+
+ /** @return Key. */
+ protected abstract K toKey(CdcEvent evt);
+
+ /** @return Value. */
+ protected abstract V toValue(int cacheId, Object val, GridCacheVersion ver);
+
+ /** Stores DR data. */
+ protected abstract void putAllConflict(int cacheId, Map<K, V> drMap);
+
+ /** Removes DR data. */
+ protected abstract void removeAllConflict(int cacheId, Map<K, GridCacheVersion> drMap);
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java
new file mode 100644
index 0000000..bbe184e
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.binary.BinaryTypeImpl;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.resources.LoggerResource;
+
+import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;
+
+/**
+ * Change Data Consumer that streams all data changes to destination cluster by the provided {@link #applier}.
+ *
+ * @see AbstractCdcEventsApplier
+ */
+public abstract class AbstractIgniteCdcStreamer implements CdcConsumer {
+ /** */
+ public static final String EVTS_CNT = "EventsCount";
+
+ /** */
+ public static final String TYPES_CNT = "TypesCount";
+
+ /** */
+ public static final String MAPPINGS_CNT = "MappingsCount";
+
+ /** */
+ public static final String EVTS_CNT_DESC = "Count of messages applied to destination cluster";
+
+ /** */
+ public static final String TYPES_CNT_DESC = "Count of received binary types events";
+
+ /** */
+ public static final String MAPPINGS_CNT_DESC = "Count of received mappings events";
+
+ /** */
+ public static final String LAST_EVT_TIME = "LastEventTime";
+
+ /** */
+ public static final String LAST_EVT_TIME_DESC = "Timestamp of last applied event";
+
+ /** Handle only primary entry flag. */
+ private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
+
+ /** Cache names. */
+ private Set<String> caches;
+
+ /** Cache IDs. */
+ protected Set<Integer> cachesIds;
+
+ /** Maximum batch size. */
+ protected int maxBatchSize;
+
+ /** Events applier. */
+ protected AbstractCdcEventsApplier<?, ?> applier;
+
+ /** Timestamp of last sent message. */
+ protected AtomicLongMetric lastEvtTs;
+
+ /** Count of events applied to destination cluster. */
+ protected AtomicLongMetric evtsCnt;
+
+ /** Count of binary types applied to destination cluster. */
+ protected AtomicLongMetric typesCnt;
+
+ /** Count of mappings applied to destination cluster. */
+ protected AtomicLongMetric mappingsCnt;
+
+ /** Logger. */
+ @LoggerResource
+ protected IgniteLogger log;
+
+ /** {@inheritDoc} */
+ @Override public void start(MetricRegistry mreg) {
+ A.notEmpty(caches, "caches");
+
+ cachesIds = caches.stream()
+ .mapToInt(CU::cacheId)
+ .boxed()
+ .collect(Collectors.toSet());
+
+ this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
+ this.typesCnt = mreg.longMetric(TYPES_CNT, TYPES_CNT_DESC);
+ this.mappingsCnt = mreg.longMetric(MAPPINGS_CNT, MAPPINGS_CNT_DESC);
+ this.lastEvtTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onEvents(Iterator<CdcEvent> events) {
+ try {
+ long msgsSnt = applier.apply(() -> F.iterator(
+ events,
+ F.identity(),
+ true,
+ evt -> !onlyPrimary || evt.primary(),
+ evt -> F.isEmpty(cachesIds) || cachesIds.contains(evt.cacheId()),
+ evt -> evt.version().otherClusterVersion() == null));
+
+ if (msgsSnt > 0) {
+ evtsCnt.add(msgsSnt);
+ lastEvtTs.value(System.currentTimeMillis());
+
+ if (log.isInfoEnabled())
+ log.info("Events applied [evtsApplied=" + evtsCnt.value() + ']');
+ }
+
+ return true;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
+ cacheEvents.forEachRemaining(e -> {
+ // Just skip. Handle of cache events not supported.
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheDestroy(Iterator<Integer> caches) {
+ caches.forEachRemaining(e -> {
+ // Just skip. Handle of cache events not supported.
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMappings(Iterator<TypeMapping> mappings) {
+ mappings.forEachRemaining(mapping -> {
+ registerMapping(binaryContext(), log, mapping);
+
+ mappingsCnt.increment();
+ });
+
+ lastEvtTs.value(System.currentTimeMillis());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTypes(Iterator<BinaryType> types) {
+ types.forEachRemaining(t -> {
+ BinaryMetadata meta = ((BinaryTypeImpl)t).metadata();
+
+ registerBinaryMeta(binaryContext(), log, meta);
+
+ typesCnt.increment();
+ });
+
+ lastEvtTs.value(System.currentTimeMillis());
+ }
+
+ /**
+ * Register {@code meta}.
+ *
+ * @param ctx Binary context.
+ * @param log Logger.
+ * @param meta Binary metadata to register.
+ */
+ public static void registerBinaryMeta(BinaryContext ctx, IgniteLogger log, BinaryMetadata meta) {
+ ctx.updateMetadata(meta.typeId(), meta, false);
+
+ if (log.isInfoEnabled())
+ log.info("BinaryMeta [meta=" + meta + ']');
+ }
+
+ /**
+ * Register {@code mapping}.
+ *
+ * @param ctx Binary context.
+ * @param log Logger.
+ * @param mapping Type mapping to register.
+ */
+ public static void registerMapping(BinaryContext ctx, IgniteLogger log, TypeMapping mapping) {
+ assert mapping.platformType().ordinal() <= Byte.MAX_VALUE;
+
+ byte platformType = (byte)mapping.platformType().ordinal();
+
+ ctx.registerUserClassName(mapping.typeId(), mapping.typeName(), false, false, platformType);
+
+ if (log.isInfoEnabled())
+ log.info("Mapping [mapping=" + mapping + ']');
+ }
+
+ /** @return Binary context. */
+ protected abstract BinaryContext binaryContext();
+
+ /**
+ * Sets whether entries only from primary nodes should be handled.
+ *
+ * @param onlyPrimary Whether entries only from primary nodes should be handled.
+ * @return {@code this} for chaining.
+ */
+ public AbstractIgniteCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
+ this.onlyPrimary = onlyPrimary;
+
+ return this;
+ }
+
+ /**
+ * Sets cache names that participate in CDC.
+ *
+ * @param caches Cache names.
+ * @return {@code this} for chaining.
+ */
+ public AbstractIgniteCdcStreamer setCaches(Set<String> caches) {
+ this.caches = caches;
+
+ return this;
+ }
+
+ /**
+ * Sets maximum batch size that will be applied to destination cluster.
+ *
+ * @param maxBatchSize Maximum batch size.
+ * @return {@code this} for chaining.
+ */
+ public AbstractIgniteCdcStreamer setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+
+ return this;
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
deleted file mode 100644
index 41fab27..0000000
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cdc;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BooleanSupplier;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.cache.CacheEntryVersion;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.binary.BinaryMetadata;
-import org.apache.ignite.internal.binary.BinaryTypeImpl;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
-import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
-import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
-import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
-import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED;
-
-/**
- * Contains logic to process {@link CdcEvent} and apply them to the provided by {@link #ignite()} cluster.
- */
-public abstract class CdcEventsApplier {
- /** Maximum batch size. */
- protected int maxBatchSize;
-
- /** Caches. */
- private final Map<Integer, IgniteInternalCache<BinaryObject, BinaryObject>> ignCaches = new HashMap<>();
-
- /** Update batch. */
- private final Map<KeyCacheObject, GridCacheDrInfo> updBatch = new HashMap<>();
-
- /** Remove batch. */
- private final Map<KeyCacheObject, GridCacheVersion> rmvBatch = new HashMap<>();
-
- /** */
- private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch);
-
- /** */
- private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);
-
- /**
- * @param maxBatchSize Maximum batch size.
- */
- public CdcEventsApplier(int maxBatchSize) {
- this.maxBatchSize = maxBatchSize;
- }
-
- /**
- * @param evts Events to process.
- * @return Number of applied events.
- * @throws IgniteCheckedException If failed.
- */
- protected int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
- IgniteInternalCache<BinaryObject, BinaryObject> currCache = null;
-
- int evtsApplied = 0;
-
- for (CdcEvent evt : evts) {
- if (log().isDebugEnabled())
- log().debug("Event received [evt=" + evt + ']');
-
- IgniteInternalCache<BinaryObject, BinaryObject> cache = ignCaches.computeIfAbsent(evt.cacheId(), cacheId -> {
- for (String cacheName : ignite().cacheNames()) {
- if (CU.cacheId(cacheName) == cacheId) {
- // IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call.
- ignite().cache(cacheName);
-
- IgniteInternalCache<Object, Object> cache0 = ignite().cachex(cacheName);
-
- assert cache0 != null;
-
- return cache0.keepBinary();
- }
- }
-
- throw new IllegalStateException("Cache with id not found [cacheId=" + cacheId + ']');
- });
-
- if (cache != currCache) {
- evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);
-
- currCache = cache;
- }
-
- CacheEntryVersion order = evt.version();
-
- KeyCacheObject key;
-
- if (evt.key() instanceof KeyCacheObject)
- key = (KeyCacheObject)evt.key();
- else
- key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());
-
- if (evt.value() != null) {
- evtsApplied += applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
-
- CacheObject val;
-
- if (evt.value() instanceof CacheObject)
- val = (CacheObject)evt.value();
- else
- val = new CacheObjectImpl(evt.value(), null);
-
- GridCacheVersion ver = new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId());
-
- GridCacheDrInfo drVal = currCache.configuration().getExpiryPolicyFactory() != null ?
- new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED, EXPIRE_TIME_CALCULATE)
- : new GridCacheDrInfo(val, ver);
-
- updBatch.put(key, drVal);
- }
- else {
- evtsApplied += applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));
-
- rmvBatch.put(key,
- new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId()));
- }
- }
-
- if (currCache != null)
- evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);
-
- return evtsApplied;
- }
-
- /**
- * Applies data from {@link #updBatch} or {@link #rmvBatch} to Ignite if required.
- *
- * @param cache Current cache.
- * @param applyUpd Apply update batch flag supplier.
- * @param applyRmv Apply remove batch flag supplier.
- * @return Number of applied events.
- * @throws IgniteCheckedException In case of error.
- */
- private int applyIf(
- IgniteInternalCache<BinaryObject, BinaryObject> cache,
- BooleanSupplier applyUpd,
- BooleanSupplier applyRmv
- ) throws IgniteCheckedException {
- int evtsApplied = 0;
-
- if (applyUpd.getAsBoolean()) {
- if (log().isDebugEnabled())
- log().debug("Applying put batch [cache=" + cache.name() + ']');
-
- cache.putAllConflict(updBatch);
-
- evtsApplied += updBatch.size();
-
- updBatch.clear();
- }
-
- if (applyRmv.getAsBoolean()) {
- if (log().isDebugEnabled())
- log().debug("Applying remove batch [cache=" + cache.name() + ']');
-
- cache.removeAllConflict(rmvBatch);
-
- evtsApplied += rmvBatch.size();
-
- rmvBatch.clear();
- }
-
- return evtsApplied;
- }
-
- /** @return {@code True} if update batch should be applied. */
- private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject key) {
- return map.size() >= maxBatchSize || map.containsKey(key);
- }
-
- /**
- * Register {@code meta} inside {@code ign} instance.
- *
- * @param ign Ignite instance.
- * @param log Logger.
- * @param meta Binary metadata to register.
- */
- public static void registerBinaryMeta(IgniteEx ign, IgniteLogger log, BinaryMetadata meta) {
- ign.context().cacheObjects().addMeta(
- meta.typeId(),
- new BinaryTypeImpl(
- ((CacheObjectBinaryProcessorImpl)ign.context().cacheObjects()).binaryContext(),
- meta
- ),
- false
- );
-
- if (log.isInfoEnabled())
- log.info("BinaryMeta[meta=" + meta + ']');
- }
-
- /**
- * Register {@code mapping} inside {@code ign} instance.
- *
- * @param ign Ignite instance.
- * @param log Logger.
- * @param mapping Type mapping to register.
- */
- public static void registerMapping(IgniteEx ign, IgniteLogger log, TypeMapping mapping) {
- assert mapping.platformType().ordinal() <= Byte.MAX_VALUE;
-
- try {
- ign.context().marshallerContext().registerClassName(
- (byte)mapping.platformType().ordinal(),
- mapping.typeId(),
- mapping.typeName(),
- false
- );
-
- if (log.isInfoEnabled())
- log.info("Mapping[mapping=" + mapping + ']');
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
-
- /** @return Ignite instance. */
- protected abstract IgniteEx ignite();
-
- /** @return Logger. */
- protected abstract IgniteLogger log();
-}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
new file mode 100644
index 0000000..bce50d3
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsIgniteApplier.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED;
+
+/**
+ * Contains logic to process {@link CdcEvent} and apply them to the destination cluster.
+ *
+ * @see IgniteInternalCache#putAllConflict(Map)
+ * @see IgniteInternalCache#removeAllConflict(Map)
+ */
+public class CdcEventsIgniteApplier extends AbstractCdcEventsApplier<KeyCacheObject, GridCacheDrInfo> {
+ /** Destination cluster. */
+ private final IgniteEx ignite;
+
+ /** Caches. */
+ private final IntMap<IgniteInternalCache<BinaryObject, BinaryObject>> ignCaches = new IntHashMap<>();
+
+ /**
+ * @param ignite Destination cluster.
+ * @param maxBatchSize Maximum batch size.
+ * @param log Logger.
+ */
+ public CdcEventsIgniteApplier(IgniteEx ignite, int maxBatchSize, IgniteLogger log) {
+ super(maxBatchSize, log);
+
+ this.ignite = ignite;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void putAllConflict(int cacheId, Map<KeyCacheObject, GridCacheDrInfo> drMap) {
+ try {
+ cache(cacheId).putAllConflict(drMap);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void removeAllConflict(int cacheId, Map<KeyCacheObject, GridCacheVersion> drMap) {
+ try {
+ cache(cacheId).removeAllConflict(drMap);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected KeyCacheObject toKey(CdcEvent evt) {
+ Object key = evt.key();
+
+ if (key instanceof KeyCacheObject)
+ return (KeyCacheObject)key;
+ else
+ return new KeyCacheObjectImpl(key, null, evt.partition());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected GridCacheDrInfo toValue(int cacheId, Object val, GridCacheVersion ver) {
+ CacheObject cacheObj;
+
+ if (val instanceof CacheObject)
+ cacheObj = (CacheObject)val;
+ else
+ cacheObj = new CacheObjectImpl(val, null);
+
+ return cache(cacheId).configuration().getExpiryPolicyFactory() != null ?
+ new GridCacheDrExpirationInfo(cacheObj, ver, TTL_NOT_CHANGED, EXPIRE_TIME_CALCULATE) :
+ new GridCacheDrInfo(cacheObj, ver);
+ }
+
+ /** @return Cache. */
+ private IgniteInternalCache<BinaryObject, BinaryObject> cache(int cacheId) {
+ return ignCaches.computeIfAbsent(cacheId, id -> {
+ for (String cacheName : ignite.cacheNames()) {
+ if (CU.cacheId(cacheName) == id) {
+ // IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call.
+ ignite.cache(cacheName);
+
+ return ignite.cachex(cacheName).keepBinary();
+ }
+ }
+
+ throw new IllegalStateException("Cache with id not found [cacheId=" + id + ']');
+ });
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
index 9db1695..60e1dfa 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
@@ -17,31 +17,17 @@
package org.apache.ignite.cdc;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.Ignition;
-import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.binary.BinaryMetadata;
-import org.apache.ignite.internal.binary.BinaryTypeImpl;
+import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
-import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteExperimental;
-import org.apache.ignite.resources.LoggerResource;
-
-import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;
-import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
/**
* Change Data Consumer that streams all data changes to provided {@link #dest} Ignite cluster.
@@ -58,150 +44,30 @@
* @see CacheVersionConflictResolverImpl
*/
@IgniteExperimental
-public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcConsumer {
- /** */
- public static final String EVTS_CNT = "EventsCount";
-
- /** */
- public static final String TYPES_CNT = "TypesCount";
-
- /** */
- public static final String MAPPINGS_CNT = "MappingsCount";
-
- /** */
- public static final String EVTS_CNT_DESC = "Count of messages applied to destination cluster";
-
- /** */
- public static final String TYPES_CNT_DESC = "Count of received binary types events";
-
- /** */
- public static final String MAPPINGS_CNT_DESC = "Count of received mappings events";
-
- /** */
- public static final String LAST_EVT_TIME = "LastEventTime";
-
- /** */
- public static final String LAST_EVT_TIME_DESC = "Timestamp of last applied event";
-
+public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer {
/** Destination cluster client configuration. */
private IgniteConfiguration destIgniteCfg;
- /** Handle only primary entry flag. */
- private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
-
/** Destination Ignite cluster client */
private IgniteEx dest;
- /** Cache names. */
- private Set<String> caches;
-
- /** Cache IDs. */
- private Set<Integer> cachesIds;
-
- /** Timestamp of last sent message. */
- private AtomicLongMetric lastEvtTs;
-
- /** Count of events applied to destination cluster. */
- protected AtomicLongMetric evtsCnt;
-
- /** Count of binary types applied to destination cluster. */
- protected AtomicLongMetric typesCnt;
-
- /** Count of mappings applied to destination cluster. */
- protected AtomicLongMetric mappingsCnt;
-
- /** Logger. */
- @LoggerResource
- private IgniteLogger log;
-
- /** */
- public IgniteToIgniteCdcStreamer() {
- super(DFLT_MAX_BATCH_SIZE);
- }
-
/** {@inheritDoc} */
@Override public void start(MetricRegistry mreg) {
- A.notNull(destIgniteCfg, "Destination ignite configuration");
- A.notEmpty(caches, "caches");
+ super.start(mreg);
if (log.isInfoEnabled())
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
- cachesIds = caches.stream()
- .mapToInt(CU::cacheId)
- .boxed()
- .collect(Collectors.toSet());
+ A.notNull(destIgniteCfg, "Destination ignite configuration");
dest = (IgniteEx)Ignition.start(destIgniteCfg);
- this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
- this.typesCnt = mreg.longMetric(TYPES_CNT, TYPES_CNT_DESC);
- this.mappingsCnt = mreg.longMetric(MAPPINGS_CNT, MAPPINGS_CNT_DESC);
- this.lastEvtTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
+ applier = new CdcEventsIgniteApplier(dest, maxBatchSize, log);
}
/** {@inheritDoc} */
- @Override public boolean onEvents(Iterator<CdcEvent> evts) {
- try {
- long msgsSnt = apply(() -> F.iterator(
- evts,
- F.identity(),
- true,
- evt -> !onlyPrimary || evt.primary(),
- evt -> F.isEmpty(cachesIds) || cachesIds.contains(evt.cacheId()),
- evt -> evt.version().otherClusterVersion() == null));
-
- if (msgsSnt > 0) {
- evtsCnt.add(msgsSnt);
- lastEvtTs.value(System.currentTimeMillis());
-
- if (log.isInfoEnabled())
- log.info("Events applied [evtsApplied=" + evtsCnt.value() + ']');
- }
-
- return true;
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onTypes(Iterator<BinaryType> types) {
- types.forEachRemaining(t -> {
- BinaryMetadata meta = ((BinaryTypeImpl)t).metadata();
-
- registerBinaryMeta(dest, log, meta);
-
- typesCnt.increment();
- });
-
- lastEvtTs.value(System.currentTimeMillis());
- }
-
- /** {@inheritDoc} */
- @Override public void onMappings(Iterator<TypeMapping> mappings) {
- mappings.forEachRemaining(m -> {
- registerMapping(dest, log, m);
-
- mappingsCnt.increment();
- });
-
- lastEvtTs.value(System.currentTimeMillis());
- }
-
- /** {@inheritDoc} */
- @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
- cacheEvents.forEachRemaining(e -> {
- // Just skip. Handle of cache events not supported.
- });
- }
-
- /** {@inheritDoc} */
- @Override public void onCacheDestroy(Iterator<Integer> caches) {
- caches.forEachRemaining(e -> {
- // Just skip. Handle of cache events not supported.
- });
+ @Override protected BinaryContext binaryContext() {
+ return ((CacheObjectBinaryProcessorImpl)dest.context().cacheObjects()).binaryContext();
}
/** {@inheritDoc} */
@@ -209,16 +75,6 @@
dest.close();
}
- /** {@inheritDoc} */
- @Override protected IgniteEx ignite() {
- return dest;
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteLogger log() {
- return log;
- }
-
/**
* Sets Ignite client node configuration that will connect to destination cluster.
* @param destIgniteCfg Ignite client node configuration that will connect to destination cluster.
@@ -229,40 +85,4 @@
return this;
}
-
- /**
- * Sets whether entries only from primary nodes should be handled.
- *
- * @param onlyPrimary Whether entries only from primary nodes should be handled.
- * @return {@code this} for chaining.
- */
- public IgniteToIgniteCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
- this.onlyPrimary = onlyPrimary;
-
- return this;
- }
-
- /**
- * Sets cache names that participate in CDC.
- *
- * @param caches Cache names.
- * @return {@code this} for chaining.
- */
- public IgniteToIgniteCdcStreamer setCaches(Set<String> caches) {
- this.caches = caches;
-
- return this;
- }
-
- /**
- * Sets maximum batch size that will be applied to destination cluster.
- *
- * @param maxBatchSize Maximum batch size.
- * @return {@code this} for chaining.
- */
- public IgniteToIgniteCdcStreamer setMaxBatchSize(int maxBatchSize) {
- this.maxBatchSize = maxBatchSize;
-
- return this;
- }
}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index acb2aeb..7b7f97a 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -35,8 +35,9 @@
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.cdc.AbstractCdcEventsApplier;
import org.apache.ignite.cdc.CdcEvent;
-import org.apache.ignite.cdc.CdcEventsApplier;
+import org.apache.ignite.cdc.CdcEventsIgniteApplier;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
@@ -83,7 +84,7 @@
* @see CdcEvent
* @see CacheEntryVersion
*/
-class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnable, AutoCloseable {
+class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
/** Ignite instance. */
private final IgniteEx ign;
@@ -120,6 +121,9 @@
/** */
private final AtomicLong rcvdEvts = new AtomicLong();
+ /** */
+ private AbstractCdcEventsApplier applier;
+
/**
* @param ign Ignite instance.
* @param log Logger.
@@ -146,8 +150,6 @@
KafkaToIgniteMetadataUpdater metaUpdr,
AtomicBoolean stopped
) {
- super(maxBatchSize);
-
this.ign = ign;
this.kafkaProps = kafkaProps;
this.topic = topic;
@@ -158,6 +160,8 @@
this.metaUpdr = metaUpdr;
this.stopped = stopped;
this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
+
+ applier = new CdcEventsIgniteApplier(ign, maxBatchSize, log);
}
/** {@inheritDoc} */
@@ -223,7 +227,7 @@
);
}
- apply(F.iterator(recs, this::deserialize, true, this::filterAndPossiblyUpdateMetadata));
+ applier.apply(F.iterator(recs, this::deserialize, true, this::filterAndPossiblyUpdateMetadata));
cnsmr.commitSync(Duration.ofMillis(kafkaReqTimeout));
}
@@ -269,16 +273,6 @@
}
/** {@inheritDoc} */
- @Override protected IgniteEx ignite() {
- return ign;
- }
-
- /** {@inheritDoc} */
- @Override protected IgniteLogger log() {
- return log;
- }
-
- /** {@inheritDoc} */
@Override public String toString() {
return S.toString(KafkaToIgniteCdcStreamerApplier.class, this);
}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
index c14097b..f15e061 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
@@ -22,10 +22,11 @@
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cdc.CdcEventsApplier;
import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -34,6 +35,8 @@
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.VoidDeserializer;
+import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.registerBinaryMeta;
+import static org.apache.ignite.cdc.AbstractIgniteCdcStreamer.registerMapping;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
@@ -87,6 +90,8 @@
/** Polls all available records from metadata topic and applies it to Ignite. */
public synchronized void updateMetadata() {
+ BinaryContext ctx = ((CacheObjectBinaryProcessorImpl)ign.context().cacheObjects()).binaryContext();
+
while (true) {
ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
@@ -100,9 +105,9 @@
Object data = IgniteUtils.fromBytes(rec.value());
if (data instanceof BinaryMetadata)
- CdcEventsApplier.registerBinaryMeta(ign, log, (BinaryMetadata)data);
+ registerBinaryMeta(ctx, log, (BinaryMetadata)data);
else if (data instanceof TypeMapping)
- CdcEventsApplier.registerMapping(ign, log, (TypeMapping)data);
+ registerMapping(ctx, log, (TypeMapping)data);
else
throw new IllegalArgumentException("Unknown meta type[type=" + data + ']');
}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
new file mode 100644
index 0000000..5484277
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/CdcEventsIgniteClientApplier.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.thin;
+
+import java.util.Map;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.AbstractCdcEventsApplier;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.collection.IntHashMap;
+import org.apache.ignite.internal.util.collection.IntMap;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ * Contains logic to process {@link CdcEvent} and apply them to the destination cluster by thin client.
+ *
+ * @see TcpClientCache#putAllConflict(Map)
+ * @see TcpClientCache#removeAllConflict(Map)
+ */
+public class CdcEventsIgniteClientApplier extends AbstractCdcEventsApplier<Object, T2<Object, GridCacheVersion>> {
+ /** Client connected to the destination cluster. */
+ private final IgniteClient client;
+
+ /** Caches. */
+ private final IntMap<TcpClientCache<Object, Object>> ignCaches = new IntHashMap<>();
+
+ /**
+ * @param client Client connected to the destination cluster.
+ * @param maxBatchSize Maximum batch size.
+ * @param log Logger.
+ */
+ public CdcEventsIgniteClientApplier(IgniteClient client, int maxBatchSize, IgniteLogger log) {
+ super(maxBatchSize, log);
+
+ this.client = client;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Object toKey(CdcEvent evt) {
+ return evt.key();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected T2<Object, GridCacheVersion> toValue(int cacheId, Object val, GridCacheVersion ver) {
+ return new T2<>(val, ver);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void putAllConflict(int cacheId, Map<Object, T2<Object, GridCacheVersion>> drMap) {
+ cache(cacheId).putAllConflict(drMap);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void removeAllConflict(int cacheId, Map<Object, GridCacheVersion> drMap) {
+ cache(cacheId).removeAllConflict(drMap);
+ }
+
+ /** @return Cache. */
+ private TcpClientCache<Object, Object> cache(int cacheId) {
+ return ignCaches.computeIfAbsent(cacheId, id -> {
+ for (String cacheName : client.cacheNames()) {
+ if (CU.cacheId(cacheName) == id)
+ return (TcpClientCache<Object, Object>)client.cache(cacheName).withKeepBinary();
+ }
+
+ throw new IllegalStateException("Cache with id not found [cacheId=" + id + ']');
+ });
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java
new file mode 100644
index 0000000..e62feba
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.thin;
+
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.AbstractIgniteCdcStreamer;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
+import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.client.thin.ClientBinary;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.util.typedef.internal.A;
+
+/**
+ * Change Data Consumer that streams all data changes to destination cluster through Ignite thin client.
+ * <p/>
+ * Consumer will just fail in case of any error during write. Fail of consumer will lead to the fail of {@code ignite-cdc} application.
+ * It expected that {@code ignite-cdc} will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as destination cluster unavailability or other issues.
+ * <p/>
+ * If you have plans to apply written messages to the other Ignite cluster in active-active manner,
+ * e.g. concurrent updates of the same entry in other cluster is possible,
+ * please, be aware of {@link CacheVersionConflictResolverImpl} conflict resolved.
+ * Configuration of {@link CacheVersionConflictResolverImpl} can be found in {@link KafkaToIgniteCdcStreamer} documentation.
+ *
+ * @see IgniteClient
+ * @see CdcMain
+ * @see CacheVersionConflictResolverImpl
+ */
+public class IgniteToIgniteClientCdcStreamer extends AbstractIgniteCdcStreamer {
+ /** Ignite thin client configuration. */
+ private ClientConfiguration destClientCfg;
+
+ /** Ignite thin client. */
+ private IgniteClient dest;
+
+ /** {@inheritDoc} */
+ @Override public void start(MetricRegistry mreg) {
+ super.start(mreg);
+
+ if (log.isInfoEnabled())
+ log.info("Ignite To Ignite Client Streamer [cacheIds=" + cachesIds + ']');
+
+ A.notNull(destClientCfg, "Destination thin client configuration");
+
+ dest = Ignition.startClient(destClientCfg);
+
+ applier = new CdcEventsIgniteClientApplier(dest, maxBatchSize, log);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected BinaryContext binaryContext() {
+ return ((ClientBinary)dest.binary()).binaryContext();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ dest.close();
+ }
+
+ /**
+ * Sets Ignite thin client configuration that will connect to destination cluster.
+ *
+ * @param destClientCfg Ignite thin client configuration that will connect to destination cluster.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToIgniteClientCdcStreamer setDestinationClientConfiguration(ClientConfiguration destClientCfg) {
+ this.destClientCfg = destClientCfg;
+
+ return this;
+ }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 29f4253..5a0a221 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -70,8 +70,8 @@
CdcConfiguration cdcCfg = new CdcConfiguration();
cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer()
- .setMaxBatchSize(KEYS_CNT)
.setDestinationIgniteConfiguration(destCfg)
+ .setMaxBatchSize(KEYS_CNT)
.setCaches(Collections.singleton(cache)));
cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
index 43e7db6..f182e77 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
@@ -20,6 +20,7 @@
import org.apache.ignite.cdc.kafka.CdcKafkaReplicationAppsTest;
import org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest;
import org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest;
+import org.apache.ignite.cdc.thin.CdcIgniteToIgniteClientReplicationTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -34,7 +35,8 @@
KafkaToIgniteLoaderTest.class,
CdcKafkaReplicationTest.class,
CdcKafkaReplicationAppsTest.class,
- ConflictResolverRestartTest.class
+ ConflictResolverRestartTest.class,
+ CdcIgniteToIgniteClientReplicationTest.class
})
public class IgniteCdcTestSuite {
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java
new file mode 100644
index 0000000..b810e18
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.thin;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.ignite.cdc.AbstractReplicationTest;
+import org.apache.ignite.cdc.CdcConfiguration;
+import org.apache.ignite.cdc.IgniteToIgniteCdcStreamer;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/**
+ * {@link IgniteToIgniteClientCdcStreamer} test.
+ */
+public class CdcIgniteToIgniteClientReplicationTest extends AbstractReplicationTest {
+ /** {@inheritDoc} */
+ @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ for (int i = 0; i < srcCluster.length; i++)
+ futs.add(igniteToIgniteClient(srcCluster[i].configuration(), destCluster, cache));
+
+ return futs;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
+ List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+ for (int i = 0; i < srcCluster.length; i++)
+ futs.add(igniteToIgniteClient(srcCluster[i].configuration(), destCluster, ACTIVE_ACTIVE_CACHE));
+
+ for (int i = 0; i < destCluster.length; i++)
+ futs.add(igniteToIgniteClient(destCluster[i].configuration(), srcCluster, ACTIVE_ACTIVE_CACHE));
+
+ return futs;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void checkConsumerMetrics(Function<String, Long> longMetric) {
+ assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.LAST_EVT_TIME));
+ assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.EVTS_CNT));
+ }
+
+ /**
+ * @param srcCfg Ignite source node configuration.
+ * @param dest Destination cluster.
+ * @param cache Cache name to replicate.
+ * @return Future for Change Data Capture application.
+ */
+ private IgniteInternalFuture<?> igniteToIgniteClient(IgniteConfiguration srcCfg, IgniteEx[] dest, String cache) {
+ return runAsync(() -> {
+ ClientConfiguration clientCfg = new ClientConfiguration();
+
+ String[] addrs = new String[dest.length];
+
+ for (int i = 0; i < dest.length; i++) {
+ ClusterNode node = dest[i].localNode();
+
+ addrs[i] = F.first(node.addresses()) + ":" + node.attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT);
+ }
+
+ clientCfg.setAddresses(addrs);
+
+ CdcConfiguration cdcCfg = new CdcConfiguration();
+
+ cdcCfg.setConsumer(new IgniteToIgniteClientCdcStreamer()
+ .setDestinationClientConfiguration(clientCfg)
+ .setCaches(Collections.singleton(cache))
+ .setMaxBatchSize(KEYS_CNT));
+
+ cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
+
+ CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg);
+
+ cdcs.add(cdc);
+
+ cdc.run();
+ });
+ }
+}