IGNITE-15000 Implemented data replication thin client operations (#9915)
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
index e6cb495..4d04a10 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
@@ -72,6 +72,9 @@
/** Version 2.13.0. */
protected static final IgniteProductVersion VER_2_13_0 = IgniteProductVersion.fromString("2.13.0");
+ /** Version 2.14.0. */
+ protected static final IgniteProductVersion VER_2_14_0 = IgniteProductVersion.fromString("2.14.0");
+
/** Parameters. */
@Parameterized.Parameters(name = "Version {0}")
public static Iterable<Object[]> versions() {
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
index 6f52705..7b72508 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
@@ -51,8 +51,11 @@
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.platform.PlatformType;
@@ -429,6 +432,9 @@
testServicesWithCallerContextThrows();
}
}
+
+ if (clientVer.compareTo(VER_2_14_0) >= 0)
+ testDataReplicationOperations(serverVer.compareTo(VER_2_14_0) >= 0);
}
/** */
@@ -473,6 +479,35 @@
}
}
+ /** @param supported {@code True} if feature supported. */
+ private void testDataReplicationOperations(boolean supported) {
+ X.println(">>>> Testing cache replication");
+
+ try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(ADDR))) {
+ TcpClientCache<Object, Object> cache = (TcpClientCache<Object, Object>)client
+ .getOrCreateCache("test-cache-replication");
+
+ Map<Object, T2<Object, GridCacheVersion>> puts = F.asMap(1, new T2<>(1, new GridCacheVersion(1, 1, 1, 2)));
+
+ Map<Object, GridCacheVersion> rmvs = F.asMap(1, new GridCacheVersion(1, 1, 1, 2));
+
+ if (supported) {
+ cache.putAllConflict(puts);
+
+ assertEquals(1, cache.get(1));
+
+ cache.removeAllConflict(rmvs);
+
+ assertFalse(cache.containsKey(1));
+ }
+ else {
+ assertThrowsWithCause(() -> cache.putAllConflict(puts), ClientFeatureNotSupportedByServerException.class);
+
+ assertThrowsWithCause(() -> cache.removeAllConflict(rmvs), ClientFeatureNotSupportedByServerException.class);
+ }
+ }
+ }
+
/** */
public static interface EchoServiceInterface {
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index e37403b..c3c6491 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -117,6 +117,12 @@
/** Cache clear keys. */
CACHE_CLEAR_KEYS(1015),
+ /** Cache put all conflict. */
+ CACHE_PUT_ALL_CONFLICT(1022),
+
+ /** Cache remove all conflict. */
+ CACHE_REMOVE_ALL_CONFLICT(1023),
+
/** Cache partitions. */
CACHE_PARTITIONS(1101),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index bf035f4..0f43725 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -59,7 +59,10 @@
SERVICE_INVOKE_CALLCTX(10),
/** Handle OP_HEARTBEAT and OP_GET_IDLE_TIMEOUT. */
- HEARTBEAT(11);
+ HEARTBEAT(11),
+
+ /** Data replication operations: {@link TcpClientCache#putAllConflict}, {@link TcpClientCache#removeAllConflict}. */
+ DATA_REPLICATION_OPERATIONS(12);
/** */
private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 99f9eea..4efa4e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -42,11 +42,14 @@
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.ClientDisconnectListener;
import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -57,7 +60,7 @@
/**
* Implementation of {@link ClientCache} over TCP protocol.
*/
-class TcpClientCache<K, V> implements ClientCache<K, V> {
+public class TcpClientCache<K, V> implements ClientCache<K, V> {
/** "Keep binary" flag mask. */
private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
@@ -863,6 +866,54 @@
U.closeQuiet(hnd);
}
+ /**
+ * Store DR data.
+ *
+ * @param drMap DR map.
+ */
+ public void putAllConflict(Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> drMap) throws ClientException {
+ A.notNull(drMap, "drMap");
+
+ ch.request(ClientOperation.CACHE_PUT_ALL_CONFLICT, req -> writePutAllConflict(drMap, req));
+ }
+
+ /**
+ * Store DR data asynchronously.
+ *
+ * @param drMap DR map.
+ * @return Future.
+ */
+ public IgniteClientFuture<Void> putAllConflictAsync(Map<? extends K, T2<? extends V, GridCacheVersion>> drMap)
+ throws ClientException {
+ A.notNull(drMap, "drMap");
+
+ return ch.requestAsync(ClientOperation.CACHE_PUT_ALL_CONFLICT, req -> writePutAllConflict(drMap, req));
+ }
+
+ /**
+ * Removes DR data.
+ *
+ * @param drMap DR map.
+ */
+ public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws ClientException {
+ A.notNull(drMap, "drMap");
+
+ ch.request(ClientOperation.CACHE_REMOVE_ALL_CONFLICT, req -> writeRemoveAllConflict(drMap, req));
+ }
+
+ /**
+ * Removes DR data asynchronously.
+ *
+ * @param drMap DR map.
+ * @return Future.
+ */
+ public IgniteClientFuture<Void> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap)
+ throws ClientException {
+ A.notNull(drMap, "drMap");
+
+ return ch.requestAsync(ClientOperation.CACHE_REMOVE_ALL_CONFLICT, req -> writeRemoveAllConflict(drMap, req));
+ }
+
/** Handle scan query. */
private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
@@ -1065,4 +1116,49 @@
serDes.writeObject(out, e.getValue());
});
}
+
+ /** */
+ private void writePutAllConflict(
+ Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> map,
+ PayloadOutputChannel req
+ ) {
+ checkDataReplicationSupported(req.clientChannel().protocolCtx());
+
+ writeCacheInfo(req);
+
+ ClientUtils.collection(
+ map.entrySet(),
+ req.out(),
+ (out, e) -> {
+ serDes.writeObject(out, e.getKey());
+ serDes.writeObject(out, e.getValue().get1());
+ serDes.writeObject(out, e.getValue().get2());
+ });
+ }
+
+ /** */
+ private void writeRemoveAllConflict(Map<? extends K, GridCacheVersion> map, PayloadOutputChannel req) {
+ checkDataReplicationSupported(req.clientChannel().protocolCtx());
+
+ writeCacheInfo(req);
+
+ ClientUtils.collection(
+ map.entrySet(),
+ req.out(),
+ (out, e) -> {
+ serDes.writeObject(out, e.getKey());
+ serDes.writeObject(out, e.getValue());
+ });
+ }
+
+ /**
+ * Check that data replication operations is supported by server.
+ *
+ * @param protocolCtx Protocol context.
+ */
+ private void checkDataReplicationSupported(ProtocolContext protocolCtx)
+ throws ClientFeatureNotSupportedByServerException {
+ if (!protocolCtx.isFeatureSupported(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS))
+ throw new ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index ea243f3..88a25fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -20,6 +20,7 @@
import java.util.EnumSet;
import org.apache.ignite.client.ClientServices;
import org.apache.ignite.internal.ThinProtocolFeature;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
/**
* Defines supported features for thin client.
@@ -59,7 +60,10 @@
SERVICE_INVOKE_CALLCTX(10),
/** Handle OP_HEARTBEAT and OP_GET_IDLE_TIMEOUT. */
- HEARTBEAT(11);
+ HEARTBEAT(11),
+
+ /** Data replication operations: {@link TcpClientCache#putAllConflict}, {@link TcpClientCache#removeAllConflict}. */
+ DATA_REPLICATION_OPERATIONS(12);
/** */
private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 15312fd..0156e85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -56,11 +56,13 @@
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheLocalPeekRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheNodePartitionsRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePartitionsRequest;
+import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutAllConflictRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutAllRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutIfAbsentRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheQueryContinuousRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheQueryNextPageRequest;
+import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveAllConflictRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveAllRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveIfEqualsRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveKeyRequest;
@@ -189,6 +191,12 @@
/** */
private static final short OP_CACHE_LOCAL_PEEK = 1021;
+ /** */
+ private static final short OP_CACHE_PUT_ALL_CONFLICT = 1022;
+
+ /** */
+ private static final short OP_CACHE_REMOVE_ALL_CONFLICT = 1023;
+
/* Cache create / destroy, configuration. */
/** */
private static final short OP_CACHE_GET_NAMES = 1050;
@@ -522,6 +530,12 @@
case OP_CACHE_REMOVE_ALL:
return new ClientCacheRemoveAllRequest(reader);
+ case OP_CACHE_PUT_ALL_CONFLICT:
+ return new ClientCachePutAllConflictRequest(reader, ctx);
+
+ case OP_CACHE_REMOVE_ALL_CONFLICT:
+ return new ClientCacheRemoveAllConflictRequest(reader);
+
case OP_CACHE_CREATE_WITH_NAME:
return new ClientCacheCreateWithNameRequest(reader);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
new file mode 100644
index 0000000..288839c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cache;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.tx.ClientTxAwareRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED;
+import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;
+
+/**
+ * Client {@link TcpClientCache#putAllConflict(Map)} request.
+ */
+public class ClientCachePutAllConflictRequest extends ClientCacheDataRequest implements ClientTxAwareRequest {
+ /** */
+ private final Map<KeyCacheObject, GridCacheDrInfo> map;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ * @param ctx Connection context.
+ */
+ public ClientCachePutAllConflictRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
+ super(reader);
+
+ boolean expPlc = cachex(ctx).configuration().getExpiryPolicyFactory() != null;
+
+ int cnt = reader.readInt();
+
+ map = new LinkedHashMap<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ KeyCacheObject key = readCacheObject(reader, true);
+ CacheObject val = readCacheObject(reader, false);
+ GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();
+
+ GridCacheDrInfo info = expPlc ?
+ new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED, EXPIRE_TIME_CALCULATE) :
+ new GridCacheDrInfo(val, ver);
+
+ map.put(key, info);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ try {
+ cachex(ctx).putAllConflict(map);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ return super.process(ctx);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
new file mode 100644
index 0000000..c6e1a35
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cache;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.tx.ClientTxAwareRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;
+
+/**
+ * Client {@link TcpClientCache#removeAllConflict(Map)} request.
+ */
+public class ClientCacheRemoveAllConflictRequest extends ClientCacheDataRequest implements ClientTxAwareRequest {
+ /** */
+ private final Map<KeyCacheObject, GridCacheVersion> map;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientCacheRemoveAllConflictRequest(BinaryReaderExImpl reader) {
+ super(reader);
+
+ int cnt = reader.readInt();
+
+ map = new LinkedHashMap<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ KeyCacheObject key = readCacheObject(reader, true);
+ GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();
+
+ map.put(key, ver);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ try {
+ cachex(ctx).removeAllConflict(map);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ return super.process(ctx);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
index cfb20d9..5881ab3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
@@ -21,6 +21,7 @@
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientRequest;
@@ -77,6 +78,18 @@
}
/**
+ * Gets the internal cache implementation, with binary mode enabled.
+ *
+ * @param ctx Kernal context.
+ * @return Cache.
+ */
+ protected IgniteInternalCache<?, ?> cachex(ClientConnectionContext ctx) {
+ String cacheName = cacheDescriptor(ctx).cacheName();
+
+ return ctx.kernalContext().grid().cachex(cacheName).keepBinary();
+ }
+
+ /**
* Gets a value indicating whether keepBinary flag is set in this request.
*
* @return keepBinary flag value.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
index 25bdae2..39a973d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
@@ -19,14 +19,11 @@
import java.util.ArrayList;
import java.util.Collection;
-
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
-import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
-import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
+import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;
+
/**
* Data streamer deserialization helpers.
*/
@@ -52,29 +49,4 @@
return entries;
}
-
- /**
- * Read cache object from the stream as raw bytes to avoid marshalling.
- */
- private static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) {
- BinaryInputStream in = reader.in();
-
- int pos0 = in.position();
-
- Object obj = reader.readObjectDetached();
-
- if (obj == null)
- return null;
-
- if (obj instanceof CacheObject)
- return (T)obj;
-
- int pos1 = in.position();
-
- in.position(pos0);
-
- byte[] objBytes = in.readByteArray(pos1 - pos0);
-
- return isKey ? (T)new KeyCacheObjectImpl(obj, objBytes, -1) : (T)new CacheObjectImpl(obj, objBytes);
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 06d55f4..8462a5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -53,11 +53,16 @@
import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinarySchema;
import org.apache.ignite.internal.binary.BinarySchemaRegistry;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
@@ -1364,6 +1369,34 @@
}
/**
+ * Read cache object from the stream as raw bytes to avoid marshalling.
+ *
+ * @param reader Reader.
+ * @param isKey {@code True} if object is a key.
+ */
+ public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) {
+ BinaryInputStream in = reader.in();
+
+ int pos0 = in.position();
+
+ Object obj = reader.readObjectDetached();
+
+ if (obj == null)
+ return null;
+
+ if (obj instanceof CacheObject)
+ return (T)obj;
+
+ int pos1 = in.position();
+
+ in.position(pos0);
+
+ byte[] objBytes = in.readByteArray(pos1 - pos0);
+
+ return isKey ? (T)new KeyCacheObjectImpl(obj, objBytes, -1) : (T)new CacheObjectImpl(obj, objBytes);
+ }
+
+ /**
* Private constructor.
*/
private PlatformUtils() {
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 075fd14..7e25d7b 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -316,7 +316,7 @@
String nullOpsNames = nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
- long expectedNullCount = 16;
+ long expectedNullCount = 18;
String msg = nullOps.size()
+ " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
new file mode 100644
index 0000000..b68ca7e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.Person;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Data replication operations test.
+ */
+@RunWith(Parameterized.class)
+public class DataReplicationOperationsTest extends AbstractThinClientTest {
+ /** Keys count. */
+ private static final int KEYS_CNT = 10;
+
+ /** */
+ private static IgniteClient client;
+
+ /** */
+ private static TcpClientCache<Object, Object> cache;
+
+ /** */
+ private final GridCacheVersion otherVer = new GridCacheVersion(1, 1, 1, 2);
+
+ /** {@code True} if operate with binary objects. */
+ @Parameterized.Parameter
+ public boolean binary;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "binary={0}")
+ public static Collection<Object[]> parameters() {
+ return cartesianProduct(F.asList(false, true));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid();
+
+ client = startClient(grid());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ grid().destroyCaches(grid().cacheNames());
+
+ cache = (TcpClientCache<Object, Object>)client.createCache(DEFAULT_CACHE_NAME);
+
+ if (binary)
+ cache = (TcpClientCache<Object, Object>)cache.withKeepBinary();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ client.close();
+ }
+
+ /** */
+ @Test
+ public void testPutAllConflict() {
+ Map<Object, T2<Object, GridCacheVersion>> data = createPutAllData();
+
+ cache.putAllConflict(data);
+
+ data.forEach((key, val) -> assertEquals(val.get1(), cache.get(key)));
+ }
+
+ /** */
+ @Test
+ public void testRemoveAllConflict() {
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.put(new Person(i, "Person-" + i), new Person(i, "Person-" + i));
+
+ Map<Object, GridCacheVersion> map = new HashMap<>();
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ Person key = new Person(i, "Person-" + i);
+
+ map.put(binary ? client.binary().toBinary(key) : key, otherVer);
+ }
+
+ cache.removeAllConflict(map);
+
+ map.keySet().forEach(key -> assertFalse(cache.containsKey(key)));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testWithExpiryPolicy() throws Exception {
+ PlatformExpiryPolicy expPlc = new PlatformExpiryPolicy(1000, 1000, 1000);
+
+ ClientCacheConfiguration ccfgWithExpPlc = new ClientCacheConfiguration()
+ .setName("cache-with-expiry-policy")
+ .setExpiryPolicy(expPlc);
+
+ TcpClientCache<Object, Object> cache = (TcpClientCache<Object, Object>)client.getOrCreateCache(ccfgWithExpPlc);
+
+ TcpClientCache<Object, Object> cacheWithExpPlc = binary ?
+ (TcpClientCache<Object, Object>)cache.withKeepBinary() : cache;
+
+ Map<Object, T2<Object, GridCacheVersion>> data = createPutAllData();
+
+ cacheWithExpPlc.putAllConflict(data);
+
+ assertTrue(cacheWithExpPlc.containsKeys(data.keySet()));
+
+ assertTrue(waitForCondition(
+ () -> data.keySet().stream().noneMatch(cacheWithExpPlc::containsKey),
+ 2 * expPlc.getExpiryForCreation().getDurationAmount()
+ ));
+ }
+
+ /** */
+ private Map<Object, T2<Object, GridCacheVersion>> createPutAllData() {
+ Map<Object, T2<Object, GridCacheVersion>> map = new HashMap<>();
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ Person key = new Person(i, "Person-" + i);
+ Person val = new Person(i, "Person-" + i);
+
+ map.put(binary ? client.binary().toBinary(key) : key,
+ new T2<>(binary ? client.binary().toBinary(val) : val, otherVer));
+ }
+
+ return map;
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index aeb9915..693c09e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -23,6 +23,7 @@
import org.apache.ignite.internal.client.thin.ClusterApiTest;
import org.apache.ignite.internal.client.thin.ClusterGroupTest;
import org.apache.ignite.internal.client.thin.ComputeTaskTest;
+import org.apache.ignite.internal.client.thin.DataReplicationOperationsTest;
import org.apache.ignite.internal.client.thin.IgniteSetTest;
import org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest;
import org.apache.ignite.internal.client.thin.ReliableChannelTest;
@@ -73,7 +74,8 @@
OptimizedMarshallerClassesCachedTest.class,
AtomicLongTest.class,
BinaryConfigurationTest.class,
- IgniteSetTest.class
+ IgniteSetTest.class,
+ DataReplicationOperationsTest.class
})
public class ClientTestSuite {
// No-op.