IGNITE-21183 Thin client: Fix client-connector threads blocking by transactional operations - Fixes #11176.
Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 876bc26..c3ab16e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -37,7 +37,6 @@
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.UUID;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@@ -52,7 +51,6 @@
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -74,7 +72,6 @@
import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFeatures;
@@ -126,7 +123,6 @@
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.CIX2;
@@ -274,10 +270,6 @@
@GridToStringExclude
protected CacheConfiguration cacheCfg;
- /** Grid configuration. */
- @GridToStringExclude
- private IgniteConfiguration gridCfg;
-
/** Cache metrics. */
protected CacheMetricsImpl metrics;
@@ -328,7 +320,6 @@
this.ctx = ctx;
- gridCfg = ctx.gridConfig();
cacheCfg = ctx.config();
locNodeId = ctx.gridConfig().getNodeId();
@@ -3594,63 +3585,12 @@
}
/**
- * Asynchronously commits transaction after all previous asynchronous operations are completed.
- *
- * @param tx Transaction to commit.
- * @return Transaction commit future.
+ * Last async operation future for implicit transactions. For explicit transactions future is
+ * bound to the transaction and stored inside the transaction.
+ * These futures are required to linearize async operations made by the same thread.
*/
- @SuppressWarnings("unchecked")
- IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final GridNearTxLocal tx) {
- FutureHolder holder = lastFut.get();
-
- holder.lock();
-
- try {
- IgniteInternalFuture fut = holder.future();
-
- if (fut != null && !fut.isDone()) {
- IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut,
- new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() {
- @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) {
- return tx.commitNearTxLocalAsync();
- }
- });
-
- saveFuture(holder, f, /*asyncOp*/false, /*retry*/false);
-
- return f;
- }
-
- IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync();
-
- saveFuture(holder, f, /*asyncOp*/false, /*retry*/false);
-
- ctx.tm().resetContext();
-
- return f;
- }
- finally {
- holder.unlock();
- }
- }
-
- /**
- * Awaits for previous async operation to be completed.
- */
- public void awaitLastFut() {
- FutureHolder holder = lastFut.get();
-
- IgniteInternalFuture fut = holder.future();
-
- if (fut != null && !fut.isDone()) {
- try {
- // Ignore any exception from previous async operation as it should be handled by user.
- fut.get();
- }
- catch (IgniteCheckedException ignored) {
- // No-op.
- }
- }
+ public FutureHolder lastAsyncFuture() {
+ return lastFut.get();
}
/**
@@ -3663,11 +3603,11 @@
@Nullable private <T> T syncOp(SyncOp<T> op) throws IgniteCheckedException {
checkJta();
- awaitLastFut();
-
GridNearTxLocal tx = checkCurrentTx();
if (tx == null || tx.implicit()) {
+ lastAsyncFuture().await();
+
TransactionConfiguration tCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config());
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -3760,8 +3700,11 @@
// Should not happen.
throw new IgniteCheckedException("Failed to perform cache operation (maximum number of retries exceeded).");
}
- else
+ else {
+ tx.txState().awaitLastFuture();
+
return op.op(tx);
+ }
}
/**
@@ -3836,12 +3779,12 @@
if (fail != null)
return fail;
- FutureHolder holder = lastFut.get();
+ FutureHolder holder = tx.implicit() ? lastAsyncFuture() : tx.txState().lastAsyncFuture();
holder.lock();
try {
- IgniteInternalFuture fut = holder.future();
+ IgniteInternalFuture<?> fut = holder.future();
final GridNearTxLocal tx0 = tx;
@@ -3913,7 +3856,9 @@
return resFut;
});
- saveFuture(holder, f, /*asyncOp*/true, retry);
+ holder.saveFuture(f);
+
+ f.listen(f0 -> asyncOpRelease(retry));
return f;
}
@@ -3923,7 +3868,7 @@
* See {@link GridDhtTxLocalAdapter#updateLockFuture(IgniteInternalFuture, IgniteInternalFuture)}
*/
if (!tx0.txState().implicitSingle())
- tx0.txState().awaitLastFuture(ctx.shared());
+ tx0.txState().awaitLastFuture();
IgniteInternalFuture<T> f;
@@ -3935,7 +3880,9 @@
ctx.shared().txContextReset();
}
- saveFuture(holder, f, /*asyncOp*/true, retry);
+ holder.saveFuture(f);
+
+ f.listen(f0 -> asyncOpRelease(retry));
if (tx.implicit())
ctx.tm().resetContext();
@@ -3948,49 +3895,6 @@
}
/**
- * Saves future in thread local holder and adds listener
- * that will clear holder when future is finished.
- *
- * @param holder Future holder.
- * @param fut Future to save.
- * @param asyncOp Whether operation is instance of AsyncOp.
- * @param retry {@code true} for retry operations.
- */
- protected void saveFuture(final FutureHolder holder, IgniteInternalFuture<?> fut, final boolean asyncOp, final boolean retry) {
- assert holder != null;
- assert fut != null;
- assert holder.holdsLock();
-
- holder.future(fut);
-
- if (fut.isDone()) {
- holder.future(null);
-
- if (asyncOp)
- asyncOpRelease(retry);
- }
- else {
- fut.listen(new CI1<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> f) {
- if (asyncOp)
- asyncOpRelease(retry);
-
- if (!holder.tryLock())
- return;
-
- try {
- if (holder.future() == f)
- holder.future(null);
- }
- finally {
- holder.unlock();
- }
- }
- });
- }
- }
-
- /**
* Tries to acquire asynchronous operations permit, if limited.
*
* @param retry Retry flag.
@@ -5543,12 +5447,12 @@
/**
* Holder for last async operation future.
*/
- protected static class FutureHolder {
+ public static class FutureHolder {
/** Lock. */
private final ReentrantLock lock = new ReentrantLock();
/** Future. */
- private IgniteInternalFuture fut;
+ private IgniteInternalFuture<?> fut;
/**
* Tries to acquire lock.
@@ -5586,7 +5490,7 @@
*
* @return Future.
*/
- public IgniteInternalFuture future() {
+ public IgniteInternalFuture<?> future() {
return fut;
}
@@ -5595,9 +5499,55 @@
*
* @param fut Future.
*/
- public void future(@Nullable IgniteInternalFuture fut) {
+ public void future(@Nullable IgniteInternalFuture<?> fut) {
this.fut = fut;
}
+
+ /**
+ * Awaits for previous async operation to be completed.
+ */
+ public void await() {
+ IgniteInternalFuture<?> fut = this.fut;
+
+ if (fut != null && !fut.isDone()) {
+ try {
+ // Ignore any exception from previous async operation as it should be handled by user.
+ fut.get();
+ }
+ catch (IgniteCheckedException ignored) {
+ // No-op.
+ }
+ }
+ }
+
+ /**
+ * Saves future in the holder and adds listener that will clear holder when future is finished.
+ *
+ * @param fut Future to save.
+ */
+ public void saveFuture(IgniteInternalFuture<?> fut) {
+ assert fut != null;
+ assert holdsLock();
+
+ if (fut.isDone())
+ future(null);
+ else {
+ future(fut);
+
+ fut.listen(f -> {
+ if (!tryLock())
+ return;
+
+ try {
+ if (future() == f)
+ future(null);
+ }
+ finally {
+ unlock();
+ }
+ });
+ }
+ }
}
/**
@@ -5894,97 +5844,6 @@
/**
*
*/
- private static class LoadCacheClosure<K, V> implements Callable<Void>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private String cacheName;
-
- /** */
- private IgniteBiPredicate<K, V> p;
-
- /** */
- private Object[] args;
-
- /** */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** */
- private ExpiryPolicy plc;
-
- /**
- * Required by {@link Externalizable}.
- */
- public LoadCacheClosure() {
- // No-op.
- }
-
- /**
- * @param cacheName Cache name.
- * @param p Predicate.
- * @param args Arguments.
- * @param plc Explicitly specified expiry policy.
- */
- private LoadCacheClosure(String cacheName,
- IgniteBiPredicate<K, V> p,
- Object[] args,
- @Nullable ExpiryPolicy plc) {
- this.cacheName = cacheName;
- this.p = p;
- this.args = args;
- this.plc = plc;
- }
-
- /** {@inheritDoc} */
- @Override public Void call() throws Exception {
- IgniteCache<K, V> cache = ignite.cache(cacheName);
-
- assert cache != null : cacheName;
-
- if (plc != null)
- cache = cache.withExpiryPolicy(plc);
-
- cache.localLoadCache(p, args);
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(p);
-
- out.writeObject(args);
-
- U.writeString(out, cacheName);
-
- if (plc != null)
- out.writeObject(new IgniteExternalizableExpiryPolicy(plc));
- else
- out.writeObject(null);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- p = (IgniteBiPredicate<K, V>)in.readObject();
-
- args = (Object[])in.readObject();
-
- cacheName = U.readString(in);
-
- plc = (ExpiryPolicy)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(LoadCacheClosure.class, this);
- }
- }
-
- /**
- *
- */
protected abstract static class UpdateTimeStatClosure<T> implements CI1<IgniteInternalFuture<T>> {
/** */
protected final CacheMetricsImpl metrics;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index abae403..242ecf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -62,6 +62,7 @@
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -1058,7 +1059,7 @@
boolean clearThreadMap = txMgr.threadLocalTx(null) == tx;
if (clearThreadMap)
- tx.txState().awaitLastFuture(this);
+ tx.txState().awaitLastFuture();
else
tx.state(MARKED_ROLLBACK);
@@ -1069,17 +1070,38 @@
* @param tx Transaction to commit.
* @return Commit future.
*/
- @SuppressWarnings("unchecked")
public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(GridNearTxLocal tx) {
- GridCacheContext ctx = tx.txState().singleCacheContext(this);
+ GridCacheAdapter.FutureHolder holder = tx.txState().lastAsyncFuture();
- if (ctx == null) {
- tx.txState().awaitLastFuture(this);
+ holder.lock();
- return tx.commitNearTxLocalAsync();
+ try {
+ IgniteInternalFuture<?> fut = holder.future();
+
+ if (fut != null && !fut.isDone()) {
+ if (tx.optimistic())
+ holder.await();
+ else {
+ IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut,
+ (o, e) -> tx.commitNearTxLocalAsync());
+
+ holder.saveFuture(f);
+
+ return f;
+ }
+ }
+
+ IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync();
+
+ holder.saveFuture(f);
+
+ txMgr.resetContext();
+
+ return f;
}
- else
- return ctx.cache().commitTxAsync(tx);
+ finally {
+ holder.unlock();
+ }
}
/**
@@ -1090,7 +1112,7 @@
boolean clearThreadMap = txMgr.threadLocalTx(null) == tx;
if (clearThreadMap)
- tx.txState().awaitLastFuture(this);
+ tx.txState().awaitLastFuture();
else
tx.state(MARKED_ROLLBACK);
@@ -1104,7 +1126,7 @@
* @throws IgniteCheckedException If suspension failed.
*/
public void suspendTx(GridNearTxLocal tx) throws IgniteCheckedException {
- tx.txState().awaitLastFuture(this);
+ tx.txState().awaitLastFuture();
tx.suspend();
}
@@ -1116,7 +1138,7 @@
* @throws IgniteCheckedException If resume failed.
*/
public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException {
- tx.txState().awaitLastFuture(this);
+ tx.txState().awaitLastFuture();
tx.resume();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 0f37686..6766a50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3388,7 +3388,7 @@
*/
public final void prepare(boolean awaitLastFut) throws IgniteCheckedException {
if (awaitLastFut)
- txState().awaitLastFuture(cctx);
+ txState().awaitLastFuture();
prepareNearTxLocal().get();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index c10f606..9bf0f4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -28,6 +28,7 @@
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -76,11 +77,6 @@
}
/** {@inheritDoc} */
- @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
- return cacheCtx;
- }
-
- /** {@inheritDoc} */
@Nullable @Override public Integer firstCacheId() {
return cacheCtx != null ? cacheCtx.cacheId() : null;
}
@@ -99,11 +95,16 @@
}
/** {@inheritDoc} */
- @Override public void awaitLastFuture(GridCacheSharedContext ctx) {
+ @Override public void awaitLastFuture() {
if (cacheCtx == null)
return;
- cacheCtx.cache().awaitLastFut();
+ cacheCtx.cache().lastAsyncFuture().await();
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheAdapter.FutureHolder lastAsyncFuture() {
+ return cacheCtx != null ? cacheCtx.cache().lastAsyncFuture() : null;
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
index 99ec983..5916075 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.processors.cache.transactions;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+
/**
*
*/
@@ -51,4 +53,12 @@
* @return Recovery mode flag.
*/
public boolean recovery();
+
+ /**
+ * Awaits for previous async operations on active caches to be completed.
+ */
+ public void awaitLastFuture();
+
+ /** Previous async operations on caches. */
+ public GridCacheAdapter.FutureHolder lastAsyncFuture();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index cfd09aa..0c38267 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -917,8 +917,8 @@
* @return {@code True} if transaction is not in completed set.
*/
public boolean onStarted(IgniteInternalTx tx) {
- assert tx.state() == ACTIVE || tx.isRollbackOnly() : "Invalid transaction state [locId=" + cctx.localNodeId() +
- ", tx=" + tx + ']';
+ assert tx.state() == ACTIVE || tx.state() == SUSPENDED || tx.isRollbackOnly() :
+ "Invalid transaction state [locId=" + cctx.localNodeId() + ", tx=" + tx + ']';
if (isCompleted(tx)) {
ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index c1d973e..6024801 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -51,11 +51,6 @@
}
/** {@inheritDoc} */
- @Override public void awaitLastFuture(GridCacheSharedContext cctx) {
- assert false;
- }
-
- /** {@inheritDoc} */
@Override public IgniteCheckedException validateTopology(
GridCacheSharedContext cctx,
boolean read,
@@ -108,11 +103,6 @@
}
/** {@inheritDoc} */
- @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
- return null;
- }
-
- /** {@inheritDoc} */
@Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
assert false;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 73ec525..e844018 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -60,17 +60,6 @@
/**
* @param cctx Context.
- * @return cctx Non-null cache context if tx has only one active cache.
- */
- @Nullable public GridCacheContext singleCacheContext(GridCacheSharedContext cctx);
-
- /**
- * @param cctx Awaits for previous async operations on active caches to be completed.
- */
- public void awaitLastFuture(GridCacheSharedContext cctx);
-
- /**
- * @param cctx Context.
* @param read {@code True} if validating for a read operation, {@code false} for write.
* @param topFut Topology future.
* @return Error if validation failed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index c049bd0..4b6ef7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -26,12 +26,12 @@
import java.util.Set;
import org.apache.ignite.IgniteCacheRestartingException;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheInterceptor;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
import org.apache.ignite.internal.processors.cache.CacheStoppedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -74,6 +74,10 @@
@GridToStringInclude
private Boolean recovery;
+ /** Async future. */
+ @GridToStringExclude
+ private final GridCacheAdapter.FutureHolder lastAsyncFut = new GridCacheAdapter.FutureHolder();
+
/** {@inheritDoc} */
@Override public boolean implicitSingle() {
return false;
@@ -102,26 +106,13 @@
}
/** {@inheritDoc} */
- @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
- if (activeCacheIds.size() == 1) {
- int cacheId = activeCacheIds.get(0);
-
- return cctx.cacheContext(cacheId);
- }
-
- return null;
+ @Override public void awaitLastFuture() {
+ lastAsyncFut.await();
}
/** {@inheritDoc} */
- @Override public void awaitLastFuture(GridCacheSharedContext cctx) {
- for (int i = 0; i < activeCacheIds.size(); i++) {
- int cacheId = activeCacheIds.get(i);
-
- if (cctx.cacheContext(cacheId) == null)
- throw new IgniteException("Cache is stopped, id=" + cacheId);
-
- cctx.cacheContext(cacheId).cache().awaitLastFut();
- }
+ @Override public GridCacheAdapter.FutureHolder lastAsyncFuture() {
+ return lastAsyncFut;
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientAsyncResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientAsyncResponse.java
new file mode 100644
index 0000000..f791113
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientAsyncResponse.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Client async response.
+ */
+public class ClientAsyncResponse extends ClientResponse implements ClientListenerAsyncResponse {
+ /** Future for response. */
+ private final IgniteInternalFuture<? extends ClientListenerResponse> fut;
+
+ /**
+ * Constructs async response.
+ */
+ public ClientAsyncResponse(long reqId, IgniteInternalFuture<? extends ClientListenerResponse> fut) {
+ super(reqId);
+
+ this.fut = fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<? extends ClientListenerResponse> future() {
+ return fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int status() {
+ assert fut.isDone();
+
+ try {
+ return fut.get().status();
+ }
+ catch (Exception e) {
+ return STATUS_FAILED;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void status(int status) {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String error() {
+ assert fut.isDone();
+
+ try {
+ return fut.get().error();
+ }
+ catch (Exception e) {
+ return e.getMessage();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void error(String err) {
+ throw new IllegalStateException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onSent() {
+ assert fut.isDone();
+
+ try {
+ fut.get().onSent();
+ }
+ catch (Exception ignore) {
+ // Ignore.
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAsyncResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAsyncResponse.java
new file mode 100644
index 0000000..873d334
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAsyncResponse.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
+
+/**
+ * Client listener async response.
+ */
+public interface ClientListenerAsyncResponse {
+ /** Future for response. */
+ public IgniteInternalFuture<? extends ClientListenerResponse> future();
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index eb8dcd9..2463c58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -196,7 +196,7 @@
assert req != null;
try {
- long startTime = 0;
+ long startTime;
if (log.isTraceEnabled()) {
startTime = System.nanoTime();
@@ -204,44 +204,79 @@
log.trace("Client request received [reqId=" + req.requestId() + ", addr=" +
ses.remoteAddress() + ", req=" + req + ']');
}
+ else
+ startTime = 0;
ClientListenerResponse resp;
- try (OperationSecurityContext s = ctx.security().withContext(connCtx.securityContext())) {
+ try (OperationSecurityContext ignored = ctx.security().withContext(connCtx.securityContext())) {
resp = hnd.handle(req);
}
if (resp != null) {
- if (log.isTraceEnabled()) {
- long dur = (System.nanoTime() - startTime) / 1000;
-
- log.trace("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur +
- ", resp=" + resp.status() + ']');
+ if (resp instanceof ClientListenerAsyncResponse) {
+ ((ClientListenerAsyncResponse)resp).future().listen(fut -> {
+ try {
+ handleResponse(req, fut.get(), startTime, ses, parser);
+ }
+ catch (Throwable e) {
+ handleError(req, e, ses, parser, hnd);
+ }
+ });
}
-
- GridNioFuture<?> fut = ses.send(parser.encode(resp));
-
- fut.listen(() -> {
- if (fut.error() == null)
- resp.onSent();
- });
+ else
+ handleResponse(req, resp, startTime, ses, parser);
}
}
catch (Throwable e) {
- hnd.unregisterRequest(req.requestId());
-
- if (e instanceof Error)
- U.error(log, "Failed to process client request [req=" + req + ", msg=" + e.getMessage() + "]", e);
- else
- U.warn(log, "Failed to process client request [req=" + req + ", msg=" + e.getMessage() + "]", e);
-
- ses.send(parser.encode(hnd.handleException(e, req)));
-
- if (e instanceof Error)
- throw (Error)e;
+ handleError(req, e, ses, parser, hnd);
}
}
+ /** */
+ private void handleResponse(
+ ClientListenerRequest req,
+ ClientListenerResponse resp,
+ long startTime,
+ GridNioSession ses,
+ ClientListenerMessageParser parser
+ ) {
+ if (log.isTraceEnabled()) {
+ long dur = (System.nanoTime() - startTime) / 1000;
+
+ log.trace("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur +
+ ", resp=" + resp.status() + ']');
+ }
+
+ GridNioFuture<?> fut = ses.send(parser.encode(resp));
+
+ fut.listen(() -> {
+ if (fut.error() == null)
+ resp.onSent();
+ });
+ }
+
+ /** */
+ private void handleError(
+ ClientListenerRequest req,
+ Throwable e,
+ GridNioSession ses,
+ ClientListenerMessageParser parser,
+ ClientListenerRequestHandler hnd
+ ) {
+ hnd.unregisterRequest(req.requestId());
+
+ if (e instanceof Error)
+ U.error(log, "Failed to process client request [req=" + req + ", msg=" + e.getMessage() + "]", e);
+ else
+ U.warn(log, "Failed to process client request [req=" + req + ", msg=" + e.getMessage() + "]", e);
+
+ ses.send(parser.encode(hnd.handleException(e, req)));
+
+ if (e instanceof Error)
+ throw (Error)e;
+ }
+
/** {@inheritDoc} */
@Override public void onSessionIdleTimeout(GridNioSession ses) {
ses.close();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
index 4488c79..e8c4a7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
@@ -56,7 +56,7 @@
/**
* @param status Status.
*/
- public void status(int status) {
+ protected void status(int status) {
this.status = status;
}
@@ -70,7 +70,7 @@
/**
* @param err Error message.
*/
- public void error(String err) {
+ protected void error(String err) {
this.err = err;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java
index 76823b5..2ac8ea0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.platform.client;
import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
/**
@@ -58,4 +59,21 @@
public ClientResponse process(ClientConnectionContext ctx) {
return new ClientResponse(reqId);
}
+
+ /**
+ * Processes the request asynchronously.
+ *
+ * @return Future for response.
+ */
+ public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+ throw new IllegalStateException("Async operation is not implemented for request " + getClass().getName());
+ }
+
+ /**
+ * @param ctx Client connection context.
+ * @return {@code True} if request should be processed asynchronously.
+ */
+ public boolean isAsync(ClientConnectionContext ctx) {
+ return false;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java
index ffbe717..ecbf956 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java
@@ -20,8 +20,10 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteIllegalStateException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryWriterExImpl;
import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.odbc.ClientAsyncResponse;
import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
@@ -47,7 +49,7 @@
private final ClientConnectionContext ctx;
/** Protocol context. */
- private ClientProtocolContext protocolCtx;
+ private final ClientProtocolContext protocolCtx;
/** Logger. */
private final IgniteLogger log;
@@ -81,7 +83,7 @@
try {
txCtx.acquire(true);
- return ((ClientRequest)req).process(ctx);
+ return handle0(req);
}
catch (IgniteCheckedException e) {
throw new IgniteClientException(ClientStatus.FAILED, e.getMessage(), e);
@@ -98,7 +100,7 @@
}
}
- return ((ClientRequest)req).process(ctx);
+ return handle0(req);
}
catch (SecurityException ex) {
throw new IgniteClientException(
@@ -109,6 +111,29 @@
}
}
+ /** */
+ private ClientListenerResponse handle0(ClientListenerRequest req) {
+ ClientRequest req0 = (ClientRequest)req;
+
+ if (req0.isAsync(ctx)) {
+ IgniteInternalFuture<ClientResponse> fut = req0.processAsync(ctx);
+
+ if (fut.isDone()) {
+ try {
+ // Some async operations can be already finished after processAsync. Shortcut for this case.
+ return fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteClientException(ClientStatus.FAILED, e.getMessage(), e);
+ }
+ }
+
+ return new ClientAsyncResponse(req0.requestId(), fut);
+ }
+ else
+ return req0.process(ctx);
+ }
+
/** {@inheritDoc} */
@Override public ClientListenerResponse handleException(Throwable e, ClientListenerRequest req) {
assert req != null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeyRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeyRequest.java
index d0aa19e..36002b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeyRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeyRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -35,10 +36,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
cache(ctx).clear(key());
return new ClientResponse(requestId());
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).clearAsync(key()), v -> new ClientResponse(requestId()));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeysRequest.java
index 04eb7f6..0f89df0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeysRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -35,10 +36,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process(ClientConnectionContext ctx) {
cache(ctx).clearAll(keys());
return super.process(ctx);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).clearAllAsync(keys()), v -> new ClientResponse(requestId()));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeyRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeyRequest.java
index 6cea754..f041f9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeyRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeyRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
boolean val = cache(ctx).containsKey(key());
return new ClientBooleanResponse(requestId(), val);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).containsKeyAsync(key()), v -> new ClientBooleanResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeysRequest.java
index 41e1306..ced710d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeysRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process(ClientConnectionContext ctx) {
boolean val = cache(ctx).containsKeys(keys());
return new ClientBooleanResponse(requestId(), val);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).containsKeysAsync(keys()), v -> new ClientBooleanResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheDataRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheDataRequest.java
index 138f28b..dae59ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheDataRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheDataRequest.java
@@ -18,11 +18,17 @@
package org.apache.ignite.internal.processors.platform.client.cache;
import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
/**
* Cache data manipulation request.
*/
-class ClientCacheDataRequest extends ClientCacheRequest {
+abstract class ClientCacheDataRequest extends ClientCacheRequest {
/** Transaction ID. Only available if request was made under a transaction. */
private final int txId;
@@ -48,4 +54,23 @@
@Override public boolean isTransactional() {
return super.isTransactional();
}
+
+ /** Chain cache operation future to return response when operation is completed. */
+ protected static <T> IgniteInternalFuture<ClientResponse> chainFuture(
+ IgniteFuture<T> fut,
+ IgniteClosure<T, ClientResponse> clo
+ ) {
+ // IgniteFuture for cache operations executes chaining/listening block via task to external executor,
+ // we don't need this additional step here, so use internal future.
+ IgniteInternalFuture<T> fut0 = ((IgniteFutureImpl<T>)fut).internalFuture();
+
+ return fut0.chain(f -> {
+ try {
+ return clo.apply(f.get());
+ }
+ catch (Exception e) {
+ throw new GridClosureException(e);
+ }
+ });
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAllRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAllRequest.java
index 6d7943b..17c3278 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAllRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAllRequest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
import java.util.Map;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -36,10 +37,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process(ClientConnectionContext ctx) {
- Map val = cache(ctx).getAll(keys());
+ Map<Object, Object> val = cache(ctx).getAll(keys());
return new ClientCacheGetAllResponse(requestId(), val);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).getAllAsync(keys()), v -> new ClientCacheGetAllResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutIfAbsentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutIfAbsentRequest.java
index 7562808..2e08254 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutIfAbsentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutIfAbsentRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientObjectResponse;
@@ -36,10 +37,15 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
Object res = cache(ctx).getAndPutIfAbsent(key(), val());
return new ClientObjectResponse(requestId(), res);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).getAndPutIfAbsentAsync(key(), val()),
+ v -> new ClientObjectResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutRequest.java
index 15794c2..cc854b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientObjectResponse;
@@ -36,10 +37,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
Object res = cache(ctx).getAndPut(key(), val());
return new ClientObjectResponse(requestId(), res);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).getAndPutAsync(key(), val()), v -> new ClientObjectResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndRemoveRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndRemoveRequest.java
index ebbe1a8..7cbac81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndRemoveRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndRemoveRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientObjectResponse;
@@ -36,10 +37,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
Object val = cache(ctx).getAndRemove(key());
return new ClientObjectResponse(requestId(), val);
}
+
+ /** {@inheritDoc} */
+ @Override protected IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).getAndRemoveAsync(key()), v -> new ClientObjectResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndReplaceRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndReplaceRequest.java
index d3af07d..8f45c22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndReplaceRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndReplaceRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientObjectResponse;
@@ -36,10 +37,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
Object res = cache(ctx).getAndReplace(key(), val());
return new ClientObjectResponse(requestId(), res);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).getAndReplaceAsync(key(), val()), v -> new ClientObjectResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetConfigurationRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetConfigurationRequest.java
index 8ce666a..699772d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetConfigurationRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetConfigurationRequest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.platform.client.cache;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -46,7 +45,7 @@
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public ClientResponse process(ClientConnectionContext ctx) {
- CacheConfiguration cfg = ((IgniteCache<Object, Object>)rawCache(ctx))
+ CacheConfiguration<Object, Object> cfg = rawCache(ctx)
.getConfiguration(CacheConfiguration.class);
return new ClientCacheGetConfigurationResponse(requestId(), cfg, protocolCtx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetRequest.java
index 6fc5256..e4aca71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientObjectResponse;
@@ -36,10 +37,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
Object val = cache(ctx).get(key());
return new ClientObjectResponse(requestId(), val);
}
+
+ /** {@inheritDoc} */
+ @Override protected IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).getAsync(key()), v -> new ClientObjectResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
index 8c28265..db86541 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
@@ -155,7 +155,7 @@
* {@inheritDoc}
*/
@Override public ClientResponse process(ClientConnectionContext ctx) {
- IgniteCache cache = !isKeepBinary() ? rawCache(ctx) : cache(ctx);
+ IgniteCache<Object, Object> cache = !isKeepBinary() ? rawCache(ctx) : cache(ctx);
if (qry.getPartition() != null)
updateAffinityMetrics(ctx, qry.getPartition());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeyRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeyRequest.java
index 5601352..b13fe7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeyRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeyRequest.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -47,6 +49,28 @@
/** {@inheritDoc} */
@Override public final ClientResponse process(ClientConnectionContext ctx) {
+ updateMetrics(ctx);
+
+ // Process request in overriden method.
+ return process0(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+ updateMetrics(ctx);
+
+ // Process request in overriden method.
+ return processAsync0(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isAsync(ClientConnectionContext ctx) {
+ // Every cache data request on the transactional cache can lock the thread, even with implicit transaction.
+ return cacheDescriptor(ctx).cacheConfiguration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** */
+ private void updateMetrics(ClientConnectionContext ctx) {
if (!isTransactional()) {
// Calculate affinity metrics.
DynamicCacheDescriptor desc = cacheDescriptor(ctx);
@@ -68,14 +92,16 @@
}
}
}
-
- // Process request in overriden method.
- return process0(ctx);
}
/** */
protected abstract ClientResponse process0(ClientConnectionContext ctx);
+ /** */
+ protected IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ throw new IllegalStateException("Async operation is not implemented for request " + getClass().getName());
+ }
+
/**
* Gets the key.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeysRequest.java
index 4e0d103..0e4bc38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeysRequest.java
@@ -19,7 +19,9 @@
import java.util.LinkedHashSet;
import java.util.Set;
+import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.tx.ClientTxAwareRequest;
/**
@@ -65,4 +67,10 @@
return keys;
}
+
+ /** {@inheritDoc} */
+ @Override public boolean isAsync(ClientConnectionContext ctx) {
+ // Every cache data request on the transactional cache can lock the thread, even with implicit transaction.
+ return cacheDescriptor(ctx).cacheConfiguration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheLocalPeekRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheLocalPeekRequest.java
index 40e0108..42dab4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheLocalPeekRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheLocalPeekRequest.java
@@ -38,10 +38,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
Object val = cache(ctx).localPeek(key(), CachePeekMode.ALL);
return new ClientObjectResponse(requestId(), val);
}
+
+ /** {@inheritDoc} */
+ @Override public boolean isAsync(ClientConnectionContext ctx) {
+ return false;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllRequest.java
index b7d8210..da53e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllRequest.java
@@ -19,6 +19,8 @@
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -48,10 +50,20 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process(ClientConnectionContext ctx) {
cache(ctx).putAll(map);
return super.process(ctx);
}
+
+ /** {@inheritDoc} */
+ @Override public boolean isAsync(ClientConnectionContext ctx) {
+ // Every cache data request on the transactional cache can lock the thread, even with implicit transaction.
+ return cacheDescriptor(ctx).cacheConfiguration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).putAllAsync(map), v -> new ClientResponse(requestId()));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutIfAbsentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutIfAbsentRequest.java
index b0bd0e6..1db83b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutIfAbsentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutIfAbsentRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
boolean res = cache(ctx).putIfAbsent(key(), val());
return new ClientBooleanResponse(requestId(), res);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).putIfAbsentAsync(key(), val()), v -> new ClientBooleanResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutRequest.java
index bc249fc..c9ff14c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -35,11 +36,15 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
cache(ctx).put(key(), val());
return new ClientResponse(requestId());
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).putAsync(key(), val()), v -> new ClientResponse(requestId()));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllRequest.java
index ab1b8be..7304654 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllRequest.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.processors.platform.client.cache;
import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -40,4 +42,15 @@
return super.process(ctx);
}
+
+ /** {@inheritDoc} */
+ @Override public boolean isAsync(ClientConnectionContext ctx) {
+ // Every cache data request on the transactional cache can lock the thread, even with implicit transaction.
+ return cacheDescriptor(ctx).cacheConfiguration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).removeAllAsync(), v -> new ClientResponse(requestId()));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveIfEqualsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveIfEqualsRequest.java
index 14948f0..2756829 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveIfEqualsRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveIfEqualsRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
boolean res = cache(ctx).remove(key(), val());
return new ClientBooleanResponse(requestId(), res);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).removeAsync(key(), val()), v -> new ClientBooleanResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeyRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeyRequest.java
index f1c5045..7b3941b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeyRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeyRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
boolean val = cache(ctx).remove(key());
return new ClientBooleanResponse(requestId(), val);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).removeAsync(key()), v -> new ClientBooleanResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeysRequest.java
index 043b568..f2e2e24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeysRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -35,10 +36,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process(ClientConnectionContext ctx) {
cache(ctx).removeAll(keys());
return super.process(ctx);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).removeAllAsync(keys()), v -> new ClientResponse(requestId()));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceIfEqualsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceIfEqualsRequest.java
index 83d40b6..55da1d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceIfEqualsRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceIfEqualsRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -41,10 +42,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
boolean res = cache(ctx).replace(key(), val(), newVal);
return new ClientBooleanResponse(requestId(), res);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).replaceAsync(key(), val(), newVal), v -> new ClientBooleanResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceRequest.java
index cbfa0fc..03f2af4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceRequest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.client.cache;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
@Override public ClientResponse process0(ClientConnectionContext ctx) {
boolean res = cache(ctx).replace(key(), val());
return new ClientBooleanResponse(requestId(), res);
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+ return chainFuture(cache(ctx).replaceAsync(key(), val()), v -> new ClientBooleanResponse(requestId(), v));
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
index 5881ab3..efb9252 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
@@ -31,7 +31,7 @@
/**
* Cache request.
*/
-public class ClientCacheRequest extends ClientRequest {
+public abstract class ClientCacheRequest extends ClientRequest {
/** "Keep binary" flag mask. */
private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
@@ -73,7 +73,7 @@
* @param ctx Kernal context.
* @return Cache.
*/
- protected IgniteCache cache(ClientConnectionContext ctx) {
+ protected IgniteCache<Object, Object> cache(ClientConnectionContext ctx) {
return rawCache(ctx).withKeepBinary();
}
@@ -83,7 +83,7 @@
* @param ctx Kernal context.
* @return Cache.
*/
- protected IgniteInternalCache<?, ?> cachex(ClientConnectionContext ctx) {
+ protected IgniteInternalCache<Object, Object> cachex(ClientConnectionContext ctx) {
String cacheName = cacheDescriptor(ctx).cacheName();
return ctx.kernalContext().grid().cachex(cacheName).keepBinary();
@@ -122,7 +122,7 @@
* @param ctx Kernal context.
* @return Cache.
*/
- protected IgniteCache rawCache(ClientConnectionContext ctx) {
+ protected IgniteCache<Object, Object> rawCache(ClientConnectionContext ctx) {
DynamicCacheDescriptor cacheDesc = cacheDescriptor(ctx);
String cacheName = cacheDesc.cacheName();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheScanQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheScanQueryRequest.java
index 9321db3..3872f4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheScanQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheScanQueryRequest.java
@@ -74,7 +74,8 @@
/** {@inheritDoc} */
@Override public ClientResponse process(ClientConnectionContext ctx) {
- IgniteCache cache = filterPlatform == ClientPlatform.JAVA && !isKeepBinary() ? rawCache(ctx) : cache(ctx);
+ IgniteCache<Object, Object> cache = filterPlatform == ClientPlatform.JAVA && !isKeepBinary() ?
+ rawCache(ctx) : cache(ctx);
ScanQuery qry = new ScanQuery()
.setLocal(loc)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlQueryRequest.java
index 00c75c2..807ac1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlQueryRequest.java
@@ -54,7 +54,7 @@
/** {@inheritDoc} */
@Override public ClientResponse process(ClientConnectionContext ctx) {
- IgniteCache cache = cache(ctx);
+ IgniteCache<Object, Object> cache = cache(ctx);
ctx.incrementCursors();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxEndRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxEndRequest.java
index 38bdeeb..8dc0702 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxEndRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxEndRequest.java
@@ -19,12 +19,15 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.binary.BinaryRawReader;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientRequest;
import org.apache.ignite.internal.processors.platform.client.ClientResponse;
import org.apache.ignite.internal.processors.platform.client.ClientStatus;
import org.apache.ignite.internal.processors.platform.client.IgniteClientException;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridClosureException;
/**
* End the transaction request.
@@ -49,11 +52,26 @@
}
/** {@inheritDoc} */
- @Override public ClientResponse process(ClientConnectionContext ctx) {
+ @Override public boolean isAsync(ClientConnectionContext ctx) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+ return endTxAsync(ctx).chain(f -> {
+ if (f.error() != null)
+ throw new GridClosureException(f.error());
+ else
+ return process(ctx);
+ });
+ }
+
+ /** End transaction asynchronously. */
+ private IgniteInternalFuture<IgniteInternalTx> endTxAsync(ClientConnectionContext ctx) {
ClientTxContext txCtx = ctx.txContext(txId);
if (txCtx == null && !committed)
- return super.process(ctx);
+ return new GridFinishedFuture<>();
if (txCtx == null)
throw new IgniteClientException(ClientStatus.TX_NOT_FOUND, "Transaction with id " + txId + " not found.");
@@ -61,12 +79,10 @@
try {
txCtx.acquire(committed);
- try (GridNearTxLocal tx = txCtx.tx()) {
- if (committed)
- tx.commit();
- else
- tx.rollback();
- }
+ if (committed)
+ return txCtx.tx().context().commitTxAsync(txCtx.tx());
+ else
+ return txCtx.tx().rollbackAsync();
}
catch (IgniteCheckedException e) {
throw new IgniteClientException(ClientStatus.FAILED, e.getMessage(), e);
@@ -81,7 +97,5 @@
// No-op.
}
}
-
- return super.process(ctx);
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/BlockingTxOpsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/BlockingTxOpsTest.java
new file mode 100644
index 0000000..833767e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/BlockingTxOpsTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientTransaction;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Thin client blocking transactional operations tests.
+ */
+@RunWith(Parameterized.class)
+public class BlockingTxOpsTest extends AbstractThinClientTest {
+ /** Default tx timeout value. */
+ private static final long TX_TIMEOUT = 5_000L;
+
+ /** */
+ private static final int THREADS_CNT = 5;
+
+ /** */
+ private int poolSize;
+
+ /** */
+ @Parameterized.Parameter(0)
+ public TransactionConcurrency txConcurrency;
+
+ /** */
+ @Parameterized.Parameter(1)
+ public TransactionIsolation txIsolation;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "concurrency={0}, isolation={1}")
+ public static List<Object[]> params() {
+ return F.asList(
+ new Object[]{ PESSIMISTIC, REPEATABLE_READ },
+ new Object[]{ OPTIMISTIC, SERIALIZABLE }
+ );
+ }
+
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.getClientConnectorConfiguration().setThreadPoolSize(poolSize);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+ stopAllGrids();
+ }
+
+ /**
+ * Tests different blocking operations in transaction.
+ */
+ @Test
+ public void testBlockingOps() throws Exception {
+ for (int i : F.asList(1, THREADS_CNT - 1)) {
+ poolSize = i;
+
+ try (Ignite ignore = startGrid(0)) {
+ try (IgniteClient client = startClient(0)) {
+ ClientCache<Object, Object> cache = client.getOrCreateCache(new ClientCacheConfiguration()
+ .setName("test")
+ .setAtomicityMode(TRANSACTIONAL)
+ );
+
+ // Clear operation.
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> cache.clear(0),
+ () -> assertFalse(cache.containsKey(0))
+ );
+
+ // Clear keys operation.
+ checkOpMultithreaded(client,
+ () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+ () -> cache.clearAll(new TreeSet<>(F.asList(0, 1))),
+ () -> assertFalse(cache.containsKeys(new TreeSet<>(F.asList(0, 1))))
+ );
+
+ // Contains operation.
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> assertTrue(cache.containsKey(0)),
+ null
+ );
+
+ // Contains keys operation.
+ checkOpMultithreaded(client,
+ () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+ () -> assertTrue(cache.containsKeys(new TreeSet<>(F.asList(0, 1)))),
+ null
+ );
+
+ // Get keys operation.
+ checkOpMultithreaded(client,
+ () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+ () -> assertEquals(F.asMap(0, 0, 1, 1), cache.getAll(new TreeSet<>(F.asList(0, 1)))),
+ null
+ );
+
+ // Get and put if absent operation.
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> assertEquals(0, cache.getAndPutIfAbsent(0, 0)),
+ null
+ );
+
+ // Get and put absent operation.
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> assertEquals(0, cache.getAndPut(0, 0)),
+ null
+ );
+
+ // Get and remove operation.
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> cache.getAndRemove(0),
+ () -> assertFalse(cache.containsKey(0))
+ );
+
+ // Get and replace operation.
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> cache.getAndReplace(0, 0),
+ () -> assertTrue(cache.containsKey(0))
+ );
+
+ // Get operation.
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> assertEquals(0, cache.get(0)),
+ null
+ );
+
+ // Put keys operation.
+ checkOpMultithreaded(client,
+ null,
+ () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+ () -> assertEquals(F.asMap(0, 0, 1, 1), cache.getAll(new TreeSet<>(F.asList(0, 1))))
+ );
+
+ // Put if absent operation
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> cache.putIfAbsent(0, 1),
+ () -> assertEquals(0, cache.get(0))
+ );
+
+ // Put operation
+ checkOpMultithreaded(client,
+ null,
+ () -> cache.put(0, 0),
+ () -> assertEquals(0, cache.get(0))
+ );
+
+ // Remove all operation.
+ checkOpMultithreaded(client,
+ () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+ () -> cache.removeAll(),
+ () -> assertEquals(0, cache.size())
+ );
+
+ // Remove if equals operation.
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> cache.remove(0, 1),
+ () -> assertEquals(0, cache.get(0))
+ );
+
+ // Remove operation.
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> cache.remove(0),
+ () -> assertFalse(cache.containsKey(0))
+ );
+
+ // Remove keys operation.
+ checkOpMultithreaded(client,
+ () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+ () -> cache.removeAll(new TreeSet<>(F.asList(0, 1))),
+ () -> assertFalse(cache.containsKeys(new TreeSet<>(F.asList(0, 1))))
+ );
+
+ // Replace if equals operation.
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> cache.replace(0, 0, 1),
+ () -> assertEquals(1, cache.get(0))
+ );
+
+ // Replace operation.
+ checkOpMultithreaded(client,
+ () -> cache.put(0, 0),
+ () -> cache.replace(0, 1),
+ () -> assertEquals(1, cache.get(0))
+ );
+ }
+ }
+ }
+ }
+
+ /** */
+ private void checkOpMultithreaded(IgniteClient client, Runnable init, Runnable op, Runnable check) throws Exception {
+ if (init != null)
+ init.run();
+
+ GridTestUtils.runMultiThreaded(() -> {
+ for (int i = 0; i < 100; i++) {
+ // Mix implicit and explicit transactions.
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ while (true) {
+ try (ClientTransaction tx = client.transactions().txStart(txConcurrency, txIsolation, TX_TIMEOUT)) {
+ op.run();
+
+ try {
+ tx.commit();
+
+ break;
+ }
+ catch (Exception e) {
+ if (!e.getMessage().contains("Failed to prepare transaction"))
+ throw e;
+ }
+ }
+ }
+ }
+ else
+ op.run();
+ }
+ }, THREADS_CNT, "tx-thread");
+
+ if (check != null)
+ check.run();
+ }
+
+ /**
+ * Tests transactional consistency on concurrent operations executed using async methods on server side.
+ */
+ @Test
+ public void testTransactionalConsistency() throws Exception {
+ poolSize = THREADS_CNT;
+
+ startGrids(3);
+ IgniteClient client = startClient(0, 1, 2);
+
+ ClientCache<Integer, Integer> cache = client.getOrCreateCache(new ClientCacheConfiguration()
+ .setName("test")
+ .setAtomicityMode(TRANSACTIONAL)
+ .setBackups(1)
+ );
+
+ int iterations = 1_000;
+ int keys = 10;
+
+ GridTestUtils.runMultiThreaded(() -> {
+ for (int i = 0; i < iterations; i++) {
+ try (ClientTransaction tx = client.transactions().txStart(txConcurrency, txIsolation, TX_TIMEOUT)) {
+ int key1 = ThreadLocalRandom.current().nextInt(keys);
+ int key2 = ThreadLocalRandom.current().nextInt(keys);
+ int sum = ThreadLocalRandom.current().nextInt(100);
+
+ if (key1 < key2) { // Avoid deadlocks
+ Integer val1 = cache.get(key1);
+ cache.put(key1, (val1 == null ? 0 : val1) - sum);
+ Integer val2 = cache.get(key2);
+ cache.put(key2, (val2 == null ? 0 : val2) + sum);
+ }
+ else {
+ Integer val2 = cache.get(key2);
+ cache.put(key2, (val2 == null ? 0 : val2) + sum);
+ Integer val1 = cache.get(key1);
+ cache.put(key1, (val1 == null ? 0 : val1) - sum);
+ }
+
+ if (ThreadLocalRandom.current().nextBoolean())
+ try {
+ tx.commit();
+ }
+ catch (Exception e) {
+ if (!e.getMessage().contains("Failed to prepare transaction"))
+ throw e;
+ }
+ else
+ tx.rollback();
+ }
+ }
+ }, THREADS_CNT, "tx-thread");
+
+ int sum = 0;
+
+ for (int i = 0; i < keys; i++) {
+ Integer val = cache.get(i);
+
+ if (val != null)
+ sum += val;
+ }
+
+ assertEquals(0, sum);
+ }
+
+ /**
+ * Tests async commit future chaining with incompleted last operation async future.
+ */
+ @Test
+ public void testCommitFutureChaining() throws Exception {
+ poolSize = 1;
+
+ try (Ignite ignore = startGrid(0)) {
+ try (IgniteClient client = startClient(0)) {
+ ClientCache<Integer, Integer> cache = client.getOrCreateCache(new ClientCacheConfiguration()
+ .setName("test")
+ .setAtomicityMode(TRANSACTIONAL)
+ .setBackups(1)
+ );
+
+ int iterations = 100;
+
+ GridTestUtils.runMultiThreaded(() -> {
+ for (int i = 0; i < iterations; i++) {
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ try (ClientTransaction tx = client.transactions().txStart(txConcurrency, txIsolation, TX_TIMEOUT)) {
+ cache.putAsync(0, 0);
+ tx.commit();
+ }
+ }
+ else
+ cache.put(0, 0);
+ }
+ }, THREADS_CNT, "tx-thread");
+ }
+ }
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 45e5593..121bd99 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -19,6 +19,7 @@
import org.apache.ignite.internal.client.thin.AffinityMetricsTest;
import org.apache.ignite.internal.client.thin.AtomicLongTest;
+import org.apache.ignite.internal.client.thin.BlockingTxOpsTest;
import org.apache.ignite.internal.client.thin.CacheAsyncTest;
import org.apache.ignite.internal.client.thin.CacheEntryListenersTest;
import org.apache.ignite.internal.client.thin.ClusterApiTest;
@@ -91,7 +92,8 @@
ThinClientEnpointsDiscoveryTest.class,
InactiveClusterCacheRequestTest.class,
AffinityMetricsTest.class,
- ClusterGroupClusterRestartTest.class
+ ClusterGroupClusterRestartTest.class,
+ BlockingTxOpsTest.class,
})
public class ClientTestSuite {
// No-op.