IGNITE-21183 Thin client: Fix client-connector threads blocking by transactional operations - Fixes #11176.

Signed-off-by: Aleksey Plekhanov <plehanov.alex@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 876bc26..c3ab16e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -37,7 +37,6 @@
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
@@ -52,7 +51,6 @@
 import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -74,7 +72,6 @@
 import org.apache.ignite.compute.ComputeJobResultPolicy;
 import org.apache.ignite.compute.ComputeTaskAdapter;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFeatures;
@@ -126,7 +123,6 @@
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.C1;
-import org.apache.ignite.internal.util.typedef.C2;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CI2;
 import org.apache.ignite.internal.util.typedef.CIX2;
@@ -274,10 +270,6 @@
     @GridToStringExclude
     protected CacheConfiguration cacheCfg;
 
-    /** Grid configuration. */
-    @GridToStringExclude
-    private IgniteConfiguration gridCfg;
-
     /** Cache metrics. */
     protected CacheMetricsImpl metrics;
 
@@ -328,7 +320,6 @@
 
         this.ctx = ctx;
 
-        gridCfg = ctx.gridConfig();
         cacheCfg = ctx.config();
 
         locNodeId = ctx.gridConfig().getNodeId();
@@ -3594,63 +3585,12 @@
     }
 
     /**
-     * Asynchronously commits transaction after all previous asynchronous operations are completed.
-     *
-     * @param tx Transaction to commit.
-     * @return Transaction commit future.
+     * Last async operation future for implicit transactions. For explicit transactions future is
+     * bound to the transaction and stored inside the transaction.
+     * These futures are required to linearize async operations made by the same thread.
      */
-    @SuppressWarnings("unchecked")
-    IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final GridNearTxLocal tx) {
-        FutureHolder holder = lastFut.get();
-
-        holder.lock();
-
-        try {
-            IgniteInternalFuture fut = holder.future();
-
-            if (fut != null && !fut.isDone()) {
-                IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut,
-                    new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() {
-                        @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) {
-                            return tx.commitNearTxLocalAsync();
-                        }
-                    });
-
-                saveFuture(holder, f, /*asyncOp*/false, /*retry*/false);
-
-                return f;
-            }
-
-            IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync();
-
-            saveFuture(holder, f, /*asyncOp*/false, /*retry*/false);
-
-            ctx.tm().resetContext();
-
-            return f;
-        }
-        finally {
-            holder.unlock();
-        }
-    }
-
-    /**
-     * Awaits for previous async operation to be completed.
-     */
-    public void awaitLastFut() {
-        FutureHolder holder = lastFut.get();
-
-        IgniteInternalFuture fut = holder.future();
-
-        if (fut != null && !fut.isDone()) {
-            try {
-                // Ignore any exception from previous async operation as it should be handled by user.
-                fut.get();
-            }
-            catch (IgniteCheckedException ignored) {
-                // No-op.
-            }
-        }
+    public FutureHolder lastAsyncFuture() {
+        return lastFut.get();
     }
 
     /**
@@ -3663,11 +3603,11 @@
     @Nullable private <T> T syncOp(SyncOp<T> op) throws IgniteCheckedException {
         checkJta();
 
-        awaitLastFut();
-
         GridNearTxLocal tx = checkCurrentTx();
 
         if (tx == null || tx.implicit()) {
+            lastAsyncFuture().await();
+
             TransactionConfiguration tCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config());
 
             CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -3760,8 +3700,11 @@
             // Should not happen.
             throw new IgniteCheckedException("Failed to perform cache operation (maximum number of retries exceeded).");
         }
-        else
+        else {
+            tx.txState().awaitLastFuture();
+
             return op.op(tx);
+        }
     }
 
     /**
@@ -3836,12 +3779,12 @@
         if (fail != null)
             return fail;
 
-        FutureHolder holder = lastFut.get();
+        FutureHolder holder = tx.implicit() ? lastAsyncFuture() : tx.txState().lastAsyncFuture();
 
         holder.lock();
 
         try {
-            IgniteInternalFuture fut = holder.future();
+            IgniteInternalFuture<?> fut = holder.future();
 
             final GridNearTxLocal tx0 = tx;
 
@@ -3913,7 +3856,9 @@
                         return resFut;
                     });
 
-                saveFuture(holder, f, /*asyncOp*/true, retry);
+                holder.saveFuture(f);
+
+                f.listen(f0 -> asyncOpRelease(retry));
 
                 return f;
             }
@@ -3923,7 +3868,7 @@
              * See {@link GridDhtTxLocalAdapter#updateLockFuture(IgniteInternalFuture, IgniteInternalFuture)}
              */
             if (!tx0.txState().implicitSingle())
-                tx0.txState().awaitLastFuture(ctx.shared());
+                tx0.txState().awaitLastFuture();
 
             IgniteInternalFuture<T> f;
 
@@ -3935,7 +3880,9 @@
                 ctx.shared().txContextReset();
             }
 
-            saveFuture(holder, f, /*asyncOp*/true, retry);
+            holder.saveFuture(f);
+
+            f.listen(f0 -> asyncOpRelease(retry));
 
             if (tx.implicit())
                 ctx.tm().resetContext();
@@ -3948,49 +3895,6 @@
     }
 
     /**
-     * Saves future in thread local holder and adds listener
-     * that will clear holder when future is finished.
-     *
-     * @param holder Future holder.
-     * @param fut Future to save.
-     * @param asyncOp Whether operation is instance of AsyncOp.
-     * @param retry {@code true} for retry operations.
-     */
-    protected void saveFuture(final FutureHolder holder, IgniteInternalFuture<?> fut, final boolean asyncOp, final boolean retry) {
-        assert holder != null;
-        assert fut != null;
-        assert holder.holdsLock();
-
-        holder.future(fut);
-
-        if (fut.isDone()) {
-            holder.future(null);
-
-            if (asyncOp)
-                asyncOpRelease(retry);
-        }
-        else {
-            fut.listen(new CI1<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> f) {
-                    if (asyncOp)
-                        asyncOpRelease(retry);
-
-                    if (!holder.tryLock())
-                        return;
-
-                    try {
-                        if (holder.future() == f)
-                            holder.future(null);
-                    }
-                    finally {
-                        holder.unlock();
-                    }
-                }
-            });
-        }
-    }
-
-    /**
      * Tries to acquire asynchronous operations permit, if limited.
      *
      * @param retry Retry flag.
@@ -5543,12 +5447,12 @@
     /**
      * Holder for last async operation future.
      */
-    protected static class FutureHolder {
+    public static class FutureHolder {
         /** Lock. */
         private final ReentrantLock lock = new ReentrantLock();
 
         /** Future. */
-        private IgniteInternalFuture fut;
+        private IgniteInternalFuture<?> fut;
 
         /**
          * Tries to acquire lock.
@@ -5586,7 +5490,7 @@
          *
          * @return Future.
          */
-        public IgniteInternalFuture future() {
+        public IgniteInternalFuture<?> future() {
             return fut;
         }
 
@@ -5595,9 +5499,55 @@
          *
          * @param fut Future.
          */
-        public void future(@Nullable IgniteInternalFuture fut) {
+        public void future(@Nullable IgniteInternalFuture<?> fut) {
             this.fut = fut;
         }
+
+        /**
+         * Awaits for previous async operation to be completed.
+         */
+        public void await() {
+            IgniteInternalFuture<?> fut = this.fut;
+
+            if (fut != null && !fut.isDone()) {
+                try {
+                    // Ignore any exception from previous async operation as it should be handled by user.
+                    fut.get();
+                }
+                catch (IgniteCheckedException ignored) {
+                    // No-op.
+                }
+            }
+        }
+
+        /**
+         * Saves future in the holder and adds listener that will clear holder when future is finished.
+         *
+         * @param fut Future to save.
+         */
+        public void saveFuture(IgniteInternalFuture<?> fut) {
+            assert fut != null;
+            assert holdsLock();
+
+            if (fut.isDone())
+                future(null);
+            else {
+                future(fut);
+
+                fut.listen(f -> {
+                    if (!tryLock())
+                        return;
+
+                    try {
+                        if (future() == f)
+                            future(null);
+                    }
+                    finally {
+                        unlock();
+                    }
+                });
+            }
+        }
     }
 
     /**
@@ -5894,97 +5844,6 @@
     /**
      *
      */
-    private static class LoadCacheClosure<K, V> implements Callable<Void>, Externalizable {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private String cacheName;
-
-        /** */
-        private IgniteBiPredicate<K, V> p;
-
-        /** */
-        private Object[] args;
-
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /** */
-        private ExpiryPolicy plc;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public LoadCacheClosure() {
-            // No-op.
-        }
-
-        /**
-         * @param cacheName Cache name.
-         * @param p Predicate.
-         * @param args Arguments.
-         * @param plc Explicitly specified expiry policy.
-         */
-        private LoadCacheClosure(String cacheName,
-            IgniteBiPredicate<K, V> p,
-            Object[] args,
-            @Nullable ExpiryPolicy plc) {
-            this.cacheName = cacheName;
-            this.p = p;
-            this.args = args;
-            this.plc = plc;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Void call() throws Exception {
-            IgniteCache<K, V> cache = ignite.cache(cacheName);
-
-            assert cache != null : cacheName;
-
-            if (plc != null)
-                cache = cache.withExpiryPolicy(plc);
-
-            cache.localLoadCache(p, args);
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeObject(p);
-
-            out.writeObject(args);
-
-            U.writeString(out, cacheName);
-
-            if (plc != null)
-                out.writeObject(new IgniteExternalizableExpiryPolicy(plc));
-            else
-                out.writeObject(null);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            p = (IgniteBiPredicate<K, V>)in.readObject();
-
-            args = (Object[])in.readObject();
-
-            cacheName = U.readString(in);
-
-            plc = (ExpiryPolicy)in.readObject();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(LoadCacheClosure.class, this);
-        }
-    }
-
-    /**
-     *
-     */
     protected abstract static class UpdateTimeStatClosure<T> implements CI1<IgniteInternalFuture<T>> {
         /** */
         protected final CacheMetricsImpl metrics;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index abae403..242ecf2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -62,6 +62,7 @@
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -1058,7 +1059,7 @@
         boolean clearThreadMap = txMgr.threadLocalTx(null) == tx;
 
         if (clearThreadMap)
-            tx.txState().awaitLastFuture(this);
+            tx.txState().awaitLastFuture();
         else
             tx.state(MARKED_ROLLBACK);
 
@@ -1069,17 +1070,38 @@
      * @param tx Transaction to commit.
      * @return Commit future.
      */
-    @SuppressWarnings("unchecked")
     public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(GridNearTxLocal tx) {
-        GridCacheContext ctx = tx.txState().singleCacheContext(this);
+        GridCacheAdapter.FutureHolder holder = tx.txState().lastAsyncFuture();
 
-        if (ctx == null) {
-            tx.txState().awaitLastFuture(this);
+        holder.lock();
 
-            return tx.commitNearTxLocalAsync();
+        try {
+            IgniteInternalFuture<?> fut = holder.future();
+
+            if (fut != null && !fut.isDone()) {
+                if (tx.optimistic())
+                    holder.await();
+                else {
+                    IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut,
+                        (o, e) -> tx.commitNearTxLocalAsync());
+
+                    holder.saveFuture(f);
+
+                    return f;
+                }
+            }
+
+            IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync();
+
+            holder.saveFuture(f);
+
+            txMgr.resetContext();
+
+            return f;
         }
-        else
-            return ctx.cache().commitTxAsync(tx);
+        finally {
+            holder.unlock();
+        }
     }
 
     /**
@@ -1090,7 +1112,7 @@
         boolean clearThreadMap = txMgr.threadLocalTx(null) == tx;
 
         if (clearThreadMap)
-            tx.txState().awaitLastFuture(this);
+            tx.txState().awaitLastFuture();
         else
             tx.state(MARKED_ROLLBACK);
 
@@ -1104,7 +1126,7 @@
      * @throws IgniteCheckedException If suspension failed.
      */
     public void suspendTx(GridNearTxLocal tx) throws IgniteCheckedException {
-        tx.txState().awaitLastFuture(this);
+        tx.txState().awaitLastFuture();
 
         tx.suspend();
     }
@@ -1116,7 +1138,7 @@
      * @throws IgniteCheckedException If resume failed.
      */
     public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException {
-        tx.txState().awaitLastFuture(this);
+        tx.txState().awaitLastFuture();
 
         tx.resume();
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 0f37686..6766a50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3388,7 +3388,7 @@
      */
     public final void prepare(boolean awaitLastFut) throws IgniteCheckedException {
         if (awaitLastFut)
-            txState().awaitLastFuture(cctx);
+            txState().awaitLastFuture();
 
         prepareNearTxLocal().get();
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index c10f606..9bf0f4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -28,6 +28,7 @@
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.cache.CacheStoppedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -76,11 +77,6 @@
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
-        return cacheCtx;
-    }
-
-    /** {@inheritDoc} */
     @Nullable @Override public Integer firstCacheId() {
         return cacheCtx != null ? cacheCtx.cacheId() : null;
     }
@@ -99,11 +95,16 @@
     }
 
     /** {@inheritDoc} */
-    @Override public void awaitLastFuture(GridCacheSharedContext ctx) {
+    @Override public void awaitLastFuture() {
         if (cacheCtx == null)
             return;
 
-        cacheCtx.cache().awaitLastFut();
+        cacheCtx.cache().lastAsyncFuture().await();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridCacheAdapter.FutureHolder lastAsyncFuture() {
+        return cacheCtx != null ? cacheCtx.cache().lastAsyncFuture() : null;
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
index 99ec983..5916075 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+
 /**
  *
  */
@@ -51,4 +53,12 @@
      * @return Recovery mode flag.
      */
     public boolean recovery();
+
+    /**
+     * Awaits for previous async operations on active caches to be completed.
+     */
+    public void awaitLastFuture();
+
+    /** Previous async operations on caches. */
+    public GridCacheAdapter.FutureHolder lastAsyncFuture();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index cfd09aa..0c38267 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -917,8 +917,8 @@
      * @return {@code True} if transaction is not in completed set.
      */
     public boolean onStarted(IgniteInternalTx tx) {
-        assert tx.state() == ACTIVE || tx.isRollbackOnly() : "Invalid transaction state [locId=" + cctx.localNodeId() +
-            ", tx=" + tx + ']';
+        assert tx.state() == ACTIVE || tx.state() == SUSPENDED || tx.isRollbackOnly() :
+            "Invalid transaction state [locId=" + cctx.localNodeId() + ", tx=" + tx + ']';
 
         if (isCompleted(tx)) {
             ConcurrentMap<GridCacheVersion, IgniteInternalTx> txIdMap = transactionMap(tx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index c1d973e..6024801 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -51,11 +51,6 @@
     }
 
     /** {@inheritDoc} */
-    @Override public void awaitLastFuture(GridCacheSharedContext cctx) {
-        assert false;
-    }
-
-    /** {@inheritDoc} */
     @Override public IgniteCheckedException validateTopology(
         GridCacheSharedContext cctx,
         boolean read,
@@ -108,11 +103,6 @@
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
         assert false;
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
index 73ec525..e844018 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -60,17 +60,6 @@
 
     /**
      * @param cctx Context.
-     * @return cctx Non-null cache context if tx has only one active cache.
-     */
-    @Nullable public GridCacheContext singleCacheContext(GridCacheSharedContext cctx);
-
-    /**
-     * @param cctx Awaits for previous async operations on active caches to be completed.
-     */
-    public void awaitLastFuture(GridCacheSharedContext cctx);
-
-    /**
-     * @param cctx Context.
      * @param read {@code True} if validating for a read operation, {@code false} for write.
      * @param topFut Topology future.
      * @return Error if validation failed.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index c049bd0..4b6ef7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -26,12 +26,12 @@
 import java.util.Set;
 import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheInterceptor;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.CacheStoppedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -74,6 +74,10 @@
     @GridToStringInclude
     private Boolean recovery;
 
+    /** Async future. */
+    @GridToStringExclude
+    private final GridCacheAdapter.FutureHolder lastAsyncFut = new GridCacheAdapter.FutureHolder();
+
     /** {@inheritDoc} */
     @Override public boolean implicitSingle() {
         return false;
@@ -102,26 +106,13 @@
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
-        if (activeCacheIds.size() == 1) {
-            int cacheId = activeCacheIds.get(0);
-
-            return cctx.cacheContext(cacheId);
-        }
-
-        return null;
+    @Override public void awaitLastFuture() {
+        lastAsyncFut.await();
     }
 
     /** {@inheritDoc} */
-    @Override public void awaitLastFuture(GridCacheSharedContext cctx) {
-        for (int i = 0; i < activeCacheIds.size(); i++) {
-            int cacheId = activeCacheIds.get(i);
-
-            if (cctx.cacheContext(cacheId) == null)
-                throw new IgniteException("Cache is stopped, id=" + cacheId);
-
-            cctx.cacheContext(cacheId).cache().awaitLastFut();
-        }
+    @Override public GridCacheAdapter.FutureHolder lastAsyncFuture() {
+        return lastAsyncFut;
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientAsyncResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientAsyncResponse.java
new file mode 100644
index 0000000..f791113
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientAsyncResponse.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+
+/**
+ * Client async response.
+ */
+public class ClientAsyncResponse extends ClientResponse implements ClientListenerAsyncResponse {
+    /** Future for response. */
+    private final IgniteInternalFuture<? extends ClientListenerResponse> fut;
+
+    /**
+     * Constructs async response.
+     */
+    public ClientAsyncResponse(long reqId, IgniteInternalFuture<? extends ClientListenerResponse> fut) {
+        super(reqId);
+
+        this.fut = fut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<? extends ClientListenerResponse> future() {
+        return fut;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int status() {
+        assert fut.isDone();
+
+        try {
+            return fut.get().status();
+        }
+        catch (Exception e) {
+            return STATUS_FAILED;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void status(int status) {
+        throw new IllegalStateException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String error() {
+        assert fut.isDone();
+
+        try {
+            return fut.get().error();
+        }
+        catch (Exception e) {
+            return e.getMessage();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void error(String err) {
+        throw new IllegalStateException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onSent() {
+        assert fut.isDone();
+
+        try {
+            fut.get().onSent();
+        }
+        catch (Exception ignore) {
+            // Ignore.
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAsyncResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAsyncResponse.java
new file mode 100644
index 0000000..873d334
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerAsyncResponse.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
+
+/**
+ * Client listener async response.
+ */
+public interface ClientListenerAsyncResponse {
+    /** Future for response. */
+    public IgniteInternalFuture<? extends ClientListenerResponse> future();
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
index eb8dcd9..2463c58 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java
@@ -196,7 +196,7 @@
         assert req != null;
 
         try {
-            long startTime = 0;
+            long startTime;
 
             if (log.isTraceEnabled()) {
                 startTime = System.nanoTime();
@@ -204,44 +204,79 @@
                 log.trace("Client request received [reqId=" + req.requestId() + ", addr=" +
                     ses.remoteAddress() + ", req=" + req + ']');
             }
+            else
+                startTime = 0;
 
             ClientListenerResponse resp;
 
-            try (OperationSecurityContext s = ctx.security().withContext(connCtx.securityContext())) {
+            try (OperationSecurityContext ignored = ctx.security().withContext(connCtx.securityContext())) {
                 resp = hnd.handle(req);
             }
 
             if (resp != null) {
-                if (log.isTraceEnabled()) {
-                    long dur = (System.nanoTime() - startTime) / 1000;
-
-                    log.trace("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur +
-                        ", resp=" + resp.status() + ']');
+                if (resp instanceof ClientListenerAsyncResponse) {
+                    ((ClientListenerAsyncResponse)resp).future().listen(fut -> {
+                        try {
+                            handleResponse(req, fut.get(), startTime, ses, parser);
+                        }
+                        catch (Throwable e) {
+                            handleError(req, e, ses, parser, hnd);
+                        }
+                    });
                 }
-
-                GridNioFuture<?> fut = ses.send(parser.encode(resp));
-
-                fut.listen(() -> {
-                    if (fut.error() == null)
-                        resp.onSent();
-                });
+                else
+                    handleResponse(req, resp, startTime, ses, parser);
             }
         }
         catch (Throwable e) {
-            hnd.unregisterRequest(req.requestId());
-
-            if (e instanceof Error)
-                U.error(log, "Failed to process client request [req=" + req + ", msg=" + e.getMessage() + "]", e);
-            else
-                U.warn(log, "Failed to process client request [req=" + req + ", msg=" + e.getMessage() + "]", e);
-
-            ses.send(parser.encode(hnd.handleException(e, req)));
-
-            if (e instanceof Error)
-                throw (Error)e;
+            handleError(req, e, ses, parser, hnd);
         }
     }
 
+    /** */
+    private void handleResponse(
+        ClientListenerRequest req,
+        ClientListenerResponse resp,
+        long startTime,
+        GridNioSession ses,
+        ClientListenerMessageParser parser
+    ) {
+        if (log.isTraceEnabled()) {
+            long dur = (System.nanoTime() - startTime) / 1000;
+
+            log.trace("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur +
+                ", resp=" + resp.status() + ']');
+        }
+
+        GridNioFuture<?> fut = ses.send(parser.encode(resp));
+
+        fut.listen(() -> {
+            if (fut.error() == null)
+                resp.onSent();
+        });
+    }
+
+    /** */
+    private void handleError(
+        ClientListenerRequest req,
+        Throwable e,
+        GridNioSession ses,
+        ClientListenerMessageParser parser,
+        ClientListenerRequestHandler hnd
+    ) {
+        hnd.unregisterRequest(req.requestId());
+
+        if (e instanceof Error)
+            U.error(log, "Failed to process client request [req=" + req + ", msg=" + e.getMessage() + "]", e);
+        else
+            U.warn(log, "Failed to process client request [req=" + req + ", msg=" + e.getMessage() + "]", e);
+
+        ses.send(parser.encode(hnd.handleException(e, req)));
+
+        if (e instanceof Error)
+            throw (Error)e;
+    }
+
     /** {@inheritDoc} */
     @Override public void onSessionIdleTimeout(GridNioSession ses) {
         ses.close();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
index 4488c79..e8c4a7a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java
@@ -56,7 +56,7 @@
     /**
      * @param status Status.
      */
-    public void status(int status) {
+    protected void status(int status) {
         this.status = status;
     }
 
@@ -70,7 +70,7 @@
     /**
      * @param err Error message.
      */
-    public void error(String err) {
+    protected void error(String err) {
         this.err = err;
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java
index 76823b5..2ac8ea0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.platform.client;
 
 import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
 
 /**
@@ -58,4 +59,21 @@
     public ClientResponse process(ClientConnectionContext ctx) {
         return new ClientResponse(reqId);
     }
+
+    /**
+     * Processes the request asynchronously.
+     *
+     * @return Future for response.
+     */
+    public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+        throw new IllegalStateException("Async operation is not implemented for request " + getClass().getName());
+    }
+
+    /**
+     * @param ctx Client connection context.
+     * @return {@code True} if request should be processed asynchronously.
+     */
+    public boolean isAsync(ClientConnectionContext ctx) {
+        return false;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java
index ffbe717..ecbf956 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientRequestHandler.java
@@ -20,8 +20,10 @@
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteIllegalStateException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.odbc.ClientAsyncResponse;
 import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequest;
 import org.apache.ignite.internal.processors.odbc.ClientListenerRequestHandler;
@@ -47,7 +49,7 @@
     private final ClientConnectionContext ctx;
 
     /** Protocol context. */
-    private ClientProtocolContext protocolCtx;
+    private final ClientProtocolContext protocolCtx;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -81,7 +83,7 @@
                         try {
                             txCtx.acquire(true);
 
-                            return ((ClientRequest)req).process(ctx);
+                            return handle0(req);
                         }
                         catch (IgniteCheckedException e) {
                             throw new IgniteClientException(ClientStatus.FAILED, e.getMessage(), e);
@@ -98,7 +100,7 @@
                 }
             }
 
-            return ((ClientRequest)req).process(ctx);
+            return handle0(req);
         }
         catch (SecurityException ex) {
             throw new IgniteClientException(
@@ -109,6 +111,29 @@
         }
     }
 
+    /** */
+    private ClientListenerResponse handle0(ClientListenerRequest req) {
+        ClientRequest req0 = (ClientRequest)req;
+
+        if (req0.isAsync(ctx)) {
+            IgniteInternalFuture<ClientResponse> fut = req0.processAsync(ctx);
+
+            if (fut.isDone()) {
+                try {
+                    // Some async operations can be already finished after processAsync. Shortcut for this case.
+                    return fut.get();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteClientException(ClientStatus.FAILED, e.getMessage(), e);
+                }
+            }
+
+            return new ClientAsyncResponse(req0.requestId(), fut);
+        }
+        else
+            return req0.process(ctx);
+    }
+
     /** {@inheritDoc} */
     @Override public ClientListenerResponse handleException(Throwable e, ClientListenerRequest req) {
         assert req != null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeyRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeyRequest.java
index d0aa19e..36002b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeyRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeyRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -35,10 +36,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         cache(ctx).clear(key());
 
         return new ClientResponse(requestId());
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).clearAsync(key()), v -> new ClientResponse(requestId()));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeysRequest.java
index 04eb7f6..0f89df0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheClearKeysRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -35,10 +36,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process(ClientConnectionContext ctx) {
         cache(ctx).clearAll(keys());
 
         return super.process(ctx);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).clearAllAsync(keys()), v -> new ClientResponse(requestId()));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeyRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeyRequest.java
index 6cea754..f041f9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeyRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeyRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         boolean val = cache(ctx).containsKey(key());
 
         return new ClientBooleanResponse(requestId(), val);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).containsKeyAsync(key()), v -> new ClientBooleanResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeysRequest.java
index 41e1306..ced710d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheContainsKeysRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process(ClientConnectionContext ctx) {
         boolean val = cache(ctx).containsKeys(keys());
 
         return new ClientBooleanResponse(requestId(), val);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).containsKeysAsync(keys()), v -> new ClientBooleanResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheDataRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheDataRequest.java
index 138f28b..dae59ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheDataRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheDataRequest.java
@@ -18,11 +18,17 @@
 package org.apache.ignite.internal.processors.platform.client.cache;
 
 import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
 
 /**
  * Cache data manipulation request.
  */
-class ClientCacheDataRequest extends ClientCacheRequest {
+abstract class ClientCacheDataRequest extends ClientCacheRequest {
     /** Transaction ID. Only available if request was made under a transaction. */
     private final int txId;
 
@@ -48,4 +54,23 @@
     @Override public boolean isTransactional() {
         return super.isTransactional();
     }
+
+    /** Chain cache operation future to return response when operation is completed. */
+    protected static <T> IgniteInternalFuture<ClientResponse> chainFuture(
+        IgniteFuture<T> fut,
+        IgniteClosure<T, ClientResponse> clo
+    ) {
+        // IgniteFuture for cache operations executes chaining/listening block via task to external executor,
+        // we don't need this additional step here, so use internal future.
+        IgniteInternalFuture<T> fut0 = ((IgniteFutureImpl<T>)fut).internalFuture();
+
+        return fut0.chain(f -> {
+            try {
+                return clo.apply(f.get());
+            }
+            catch (Exception e) {
+                throw new GridClosureException(e);
+            }
+        });
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAllRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAllRequest.java
index 6d7943b..17c3278 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAllRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAllRequest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.platform.client.cache;
 
 import java.util.Map;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -36,10 +37,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process(ClientConnectionContext ctx) {
-        Map val = cache(ctx).getAll(keys());
+        Map<Object, Object> val = cache(ctx).getAll(keys());
 
         return new ClientCacheGetAllResponse(requestId(), val);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).getAllAsync(keys()), v -> new ClientCacheGetAllResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutIfAbsentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutIfAbsentRequest.java
index 7562808..2e08254 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutIfAbsentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutIfAbsentRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientObjectResponse;
@@ -36,10 +37,15 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         Object res = cache(ctx).getAndPutIfAbsent(key(), val());
 
         return new ClientObjectResponse(requestId(), res);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).getAndPutIfAbsentAsync(key(), val()),
+            v -> new ClientObjectResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutRequest.java
index 15794c2..cc854b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndPutRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientObjectResponse;
@@ -36,10 +37,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         Object res = cache(ctx).getAndPut(key(), val());
 
         return new ClientObjectResponse(requestId(), res);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).getAndPutAsync(key(), val()), v -> new ClientObjectResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndRemoveRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndRemoveRequest.java
index ebbe1a8..7cbac81 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndRemoveRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndRemoveRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientObjectResponse;
@@ -36,10 +37,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         Object val = cache(ctx).getAndRemove(key());
 
         return new ClientObjectResponse(requestId(), val);
     }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).getAndRemoveAsync(key()), v -> new ClientObjectResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndReplaceRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndReplaceRequest.java
index d3af07d..8f45c22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndReplaceRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetAndReplaceRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientObjectResponse;
@@ -36,10 +37,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         Object res = cache(ctx).getAndReplace(key(), val());
 
         return new ClientObjectResponse(requestId(), res);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).getAndReplaceAsync(key(), val()), v -> new ClientObjectResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetConfigurationRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetConfigurationRequest.java
index 8ce666a..699772d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetConfigurationRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetConfigurationRequest.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
-import org.apache.ignite.IgniteCache;
 import org.apache.ignite.binary.BinaryRawReader;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -46,7 +45,7 @@
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public ClientResponse process(ClientConnectionContext ctx) {
-        CacheConfiguration cfg = ((IgniteCache<Object, Object>)rawCache(ctx))
+        CacheConfiguration<Object, Object> cfg = rawCache(ctx)
                 .getConfiguration(CacheConfiguration.class);
 
         return new ClientCacheGetConfigurationResponse(requestId(), cfg, protocolCtx);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetRequest.java
index 6fc5256..e4aca71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheGetRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientObjectResponse;
@@ -36,10 +37,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         Object val = cache(ctx).get(key());
 
         return new ClientObjectResponse(requestId(), val);
     }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).getAsync(key()), v -> new ClientObjectResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
index 8c28265..db86541 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheIndexQueryRequest.java
@@ -155,7 +155,7 @@
      * {@inheritDoc}
      */
     @Override public ClientResponse process(ClientConnectionContext ctx) {
-        IgniteCache cache = !isKeepBinary() ? rawCache(ctx) : cache(ctx);
+        IgniteCache<Object, Object> cache = !isKeepBinary() ? rawCache(ctx) : cache(ctx);
 
         if (qry.getPartition() != null)
             updateAffinityMetrics(ctx, qry.getPartition());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeyRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeyRequest.java
index 5601352..b13fe7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeyRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeyRequest.java
@@ -17,9 +17,11 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -47,6 +49,28 @@
 
     /** {@inheritDoc} */
     @Override public final ClientResponse process(ClientConnectionContext ctx) {
+        updateMetrics(ctx);
+
+        // Process request in overriden method.
+        return process0(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public final IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+        updateMetrics(ctx);
+
+        // Process request in overriden method.
+        return processAsync0(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAsync(ClientConnectionContext ctx) {
+        // Every cache data request on the transactional cache can lock the thread, even with implicit transaction.
+        return cacheDescriptor(ctx).cacheConfiguration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** */
+    private void updateMetrics(ClientConnectionContext ctx) {
         if (!isTransactional()) {
             // Calculate affinity metrics.
             DynamicCacheDescriptor desc = cacheDescriptor(ctx);
@@ -68,14 +92,16 @@
                 }
             }
         }
-
-        // Process request in overriden method.
-        return process0(ctx);
     }
 
     /** */
     protected abstract ClientResponse process0(ClientConnectionContext ctx);
 
+    /** */
+    protected IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        throw new IllegalStateException("Async operation is not implemented for request " + getClass().getName());
+    }
+
     /**
      * Gets the key.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeysRequest.java
index 4e0d103..0e4bc38 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheKeysRequest.java
@@ -19,7 +19,9 @@
 
 import java.util.LinkedHashSet;
 import java.util.Set;
+import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.tx.ClientTxAwareRequest;
 
 /**
@@ -65,4 +67,10 @@
 
         return keys;
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAsync(ClientConnectionContext ctx) {
+        // Every cache data request on the transactional cache can lock the thread, even with implicit transaction.
+        return cacheDescriptor(ctx).cacheConfiguration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheLocalPeekRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheLocalPeekRequest.java
index 40e0108..42dab4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheLocalPeekRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheLocalPeekRequest.java
@@ -38,10 +38,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         Object val = cache(ctx).localPeek(key(), CachePeekMode.ALL);
 
         return new ClientObjectResponse(requestId(), val);
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAsync(ClientConnectionContext ctx) {
+        return false;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllRequest.java
index b7d8210..da53e0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllRequest.java
@@ -19,6 +19,8 @@
 
 import java.util.LinkedHashMap;
 import java.util.Map;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -48,10 +50,20 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process(ClientConnectionContext ctx) {
         cache(ctx).putAll(map);
 
         return super.process(ctx);
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAsync(ClientConnectionContext ctx) {
+        // Every cache data request on the transactional cache can lock the thread, even with implicit transaction.
+        return cacheDescriptor(ctx).cacheConfiguration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).putAllAsync(map), v -> new ClientResponse(requestId()));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutIfAbsentRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutIfAbsentRequest.java
index b0bd0e6..1db83b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutIfAbsentRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutIfAbsentRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         boolean res = cache(ctx).putIfAbsent(key(), val());
 
         return new ClientBooleanResponse(requestId(), res);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).putIfAbsentAsync(key(), val()), v -> new ClientBooleanResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutRequest.java
index bc249fc..c9ff14c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -35,11 +36,15 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         cache(ctx).put(key(), val());
 
         return new ClientResponse(requestId());
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).putAsync(key(), val()), v -> new ClientResponse(requestId()));
+    }
 }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllRequest.java
index ab1b8be..7304654 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllRequest.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.processors.platform.client.cache;
 
 import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
 
@@ -40,4 +42,15 @@
 
         return super.process(ctx);
     }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAsync(ClientConnectionContext ctx) {
+        // Every cache data request on the transactional cache can lock the thread, even with implicit transaction.
+        return cacheDescriptor(ctx).cacheConfiguration().getAtomicityMode() == CacheAtomicityMode.TRANSACTIONAL;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).removeAllAsync(), v -> new ClientResponse(requestId()));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveIfEqualsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveIfEqualsRequest.java
index 14948f0..2756829 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveIfEqualsRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveIfEqualsRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         boolean res = cache(ctx).remove(key(), val());
 
         return new ClientBooleanResponse(requestId(), res);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).removeAsync(key(), val()), v -> new ClientBooleanResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeyRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeyRequest.java
index f1c5045..7b3941b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeyRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeyRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         boolean val = cache(ctx).remove(key());
 
         return new ClientBooleanResponse(requestId(), val);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).removeAsync(key()), v -> new ClientBooleanResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeysRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeysRequest.java
index 043b568..f2e2e24 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeysRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveKeysRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
@@ -35,10 +36,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process(ClientConnectionContext ctx) {
         cache(ctx).removeAll(keys());
 
         return super.process(ctx);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).removeAllAsync(keys()), v -> new ClientResponse(requestId()));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceIfEqualsRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceIfEqualsRequest.java
index 83d40b6..55da1d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceIfEqualsRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceIfEqualsRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -41,10 +42,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         boolean res = cache(ctx).replace(key(), val(), newVal);
 
         return new ClientBooleanResponse(requestId(), res);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).replaceAsync(key(), val(), newVal), v -> new ClientBooleanResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceRequest.java
index cbfa0fc..03f2af4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheReplaceRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.platform.client.cache;
 
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.processors.platform.client.ClientBooleanResponse;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
@@ -36,10 +37,14 @@
     }
 
     /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
     @Override public ClientResponse process0(ClientConnectionContext ctx) {
         boolean res = cache(ctx).replace(key(), val());
 
         return new ClientBooleanResponse(requestId(), res);
     }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync0(ClientConnectionContext ctx) {
+        return chainFuture(cache(ctx).replaceAsync(key(), val()), v -> new ClientBooleanResponse(requestId(), v));
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
index 5881ab3..efb9252 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
@@ -31,7 +31,7 @@
 /**
  * Cache request.
  */
-public class ClientCacheRequest extends ClientRequest {
+public abstract class ClientCacheRequest extends ClientRequest {
     /** "Keep binary" flag mask. */
     private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
 
@@ -73,7 +73,7 @@
      * @param ctx Kernal context.
      * @return Cache.
      */
-    protected IgniteCache cache(ClientConnectionContext ctx) {
+    protected IgniteCache<Object, Object> cache(ClientConnectionContext ctx) {
         return rawCache(ctx).withKeepBinary();
     }
 
@@ -83,7 +83,7 @@
      * @param ctx Kernal context.
      * @return Cache.
      */
-    protected IgniteInternalCache<?, ?> cachex(ClientConnectionContext ctx) {
+    protected IgniteInternalCache<Object, Object> cachex(ClientConnectionContext ctx) {
         String cacheName = cacheDescriptor(ctx).cacheName();
 
         return ctx.kernalContext().grid().cachex(cacheName).keepBinary();
@@ -122,7 +122,7 @@
      * @param ctx Kernal context.
      * @return Cache.
      */
-    protected IgniteCache rawCache(ClientConnectionContext ctx) {
+    protected IgniteCache<Object, Object> rawCache(ClientConnectionContext ctx) {
         DynamicCacheDescriptor cacheDesc = cacheDescriptor(ctx);
 
         String cacheName = cacheDesc.cacheName();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheScanQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheScanQueryRequest.java
index 9321db3..3872f4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheScanQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheScanQueryRequest.java
@@ -74,7 +74,8 @@
 
     /** {@inheritDoc} */
     @Override public ClientResponse process(ClientConnectionContext ctx) {
-        IgniteCache cache = filterPlatform == ClientPlatform.JAVA && !isKeepBinary() ? rawCache(ctx) : cache(ctx);
+        IgniteCache<Object, Object> cache = filterPlatform == ClientPlatform.JAVA && !isKeepBinary() ?
+            rawCache(ctx) : cache(ctx);
 
         ScanQuery qry = new ScanQuery()
             .setLocal(loc)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlQueryRequest.java
index 00c75c2..807ac1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheSqlQueryRequest.java
@@ -54,7 +54,7 @@
 
     /** {@inheritDoc} */
     @Override public ClientResponse process(ClientConnectionContext ctx) {
-        IgniteCache cache = cache(ctx);
+        IgniteCache<Object, Object> cache = cache(ctx);
 
         ctx.incrementCursors();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxEndRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxEndRequest.java
index 38bdeeb..8dc0702 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxEndRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/tx/ClientTxEndRequest.java
@@ -19,12 +19,15 @@
 
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryRawReader;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientRequest;
 import org.apache.ignite.internal.processors.platform.client.ClientResponse;
 import org.apache.ignite.internal.processors.platform.client.ClientStatus;
 import org.apache.ignite.internal.processors.platform.client.IgniteClientException;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.lang.GridClosureException;
 
 /**
  * End the transaction request.
@@ -49,11 +52,26 @@
     }
 
     /** {@inheritDoc} */
-    @Override public ClientResponse process(ClientConnectionContext ctx) {
+    @Override public boolean isAsync(ClientConnectionContext ctx) {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<ClientResponse> processAsync(ClientConnectionContext ctx) {
+        return endTxAsync(ctx).chain(f -> {
+            if (f.error() != null)
+                throw new GridClosureException(f.error());
+            else
+                return process(ctx);
+        });
+    }
+
+    /** End transaction asynchronously. */
+    private IgniteInternalFuture<IgniteInternalTx> endTxAsync(ClientConnectionContext ctx) {
         ClientTxContext txCtx = ctx.txContext(txId);
 
         if (txCtx == null && !committed)
-            return super.process(ctx);
+            return new GridFinishedFuture<>();
 
         if (txCtx == null)
             throw new IgniteClientException(ClientStatus.TX_NOT_FOUND, "Transaction with id " + txId + " not found.");
@@ -61,12 +79,10 @@
         try {
             txCtx.acquire(committed);
 
-            try (GridNearTxLocal tx = txCtx.tx()) {
-                if (committed)
-                    tx.commit();
-                else
-                    tx.rollback();
-            }
+            if (committed)
+                return txCtx.tx().context().commitTxAsync(txCtx.tx());
+            else
+                return txCtx.tx().rollbackAsync();
         }
         catch (IgniteCheckedException e) {
             throw new IgniteClientException(ClientStatus.FAILED, e.getMessage(), e);
@@ -81,7 +97,5 @@
                 // No-op.
             }
         }
-
-        return super.process(ctx);
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/BlockingTxOpsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/BlockingTxOpsTest.java
new file mode 100644
index 0000000..833767e
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/BlockingTxOpsTest.java
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.client.ClientCache;
+import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.ClientTransaction;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
+/**
+ * Thin client blocking transactional operations tests.
+ */
+@RunWith(Parameterized.class)
+public class BlockingTxOpsTest extends AbstractThinClientTest {
+    /** Default tx timeout value. */
+    private static final long TX_TIMEOUT = 5_000L;
+
+    /** */
+    private static final int THREADS_CNT = 5;
+
+    /** */
+    private int poolSize;
+
+    /** */
+    @Parameterized.Parameter(0)
+    public TransactionConcurrency txConcurrency;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public TransactionIsolation txIsolation;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "concurrency={0}, isolation={1}")
+    public static List<Object[]> params() {
+        return F.asList(
+            new Object[]{ PESSIMISTIC, REPEATABLE_READ },
+            new Object[]{ OPTIMISTIC, SERIALIZABLE }
+        );
+    }
+
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.getClientConnectorConfiguration().setThreadPoolSize(poolSize);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+        stopAllGrids();
+    }
+
+    /**
+     * Tests different blocking operations in transaction.
+     */
+    @Test
+    public void testBlockingOps() throws Exception {
+        for (int i : F.asList(1, THREADS_CNT - 1)) {
+            poolSize = i;
+
+            try (Ignite ignore = startGrid(0)) {
+                try (IgniteClient client = startClient(0)) {
+                    ClientCache<Object, Object> cache = client.getOrCreateCache(new ClientCacheConfiguration()
+                        .setName("test")
+                        .setAtomicityMode(TRANSACTIONAL)
+                    );
+
+                    // Clear operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> cache.clear(0),
+                        () -> assertFalse(cache.containsKey(0))
+                    );
+
+                    // Clear keys operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+                        () -> cache.clearAll(new TreeSet<>(F.asList(0, 1))),
+                        () -> assertFalse(cache.containsKeys(new TreeSet<>(F.asList(0, 1))))
+                    );
+
+                    // Contains operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> assertTrue(cache.containsKey(0)),
+                        null
+                    );
+
+                    // Contains keys operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+                        () -> assertTrue(cache.containsKeys(new TreeSet<>(F.asList(0, 1)))),
+                        null
+                    );
+
+                    // Get keys operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+                        () -> assertEquals(F.asMap(0, 0, 1, 1), cache.getAll(new TreeSet<>(F.asList(0, 1)))),
+                        null
+                    );
+
+                    // Get and put if absent operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> assertEquals(0, cache.getAndPutIfAbsent(0, 0)),
+                        null
+                    );
+
+                    // Get and put absent operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> assertEquals(0, cache.getAndPut(0, 0)),
+                        null
+                    );
+
+                    // Get and remove operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> cache.getAndRemove(0),
+                        () -> assertFalse(cache.containsKey(0))
+                    );
+
+                    // Get and replace operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> cache.getAndReplace(0, 0),
+                        () -> assertTrue(cache.containsKey(0))
+                    );
+
+                    // Get operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> assertEquals(0, cache.get(0)),
+                        null
+                    );
+
+                    // Put keys operation.
+                    checkOpMultithreaded(client,
+                        null,
+                        () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+                        () -> assertEquals(F.asMap(0, 0, 1, 1), cache.getAll(new TreeSet<>(F.asList(0, 1))))
+                    );
+
+                    // Put if absent operation
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> cache.putIfAbsent(0, 1),
+                        () -> assertEquals(0, cache.get(0))
+                    );
+
+                    // Put operation
+                    checkOpMultithreaded(client,
+                        null,
+                        () -> cache.put(0, 0),
+                        () -> assertEquals(0, cache.get(0))
+                    );
+
+                    // Remove all operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+                        () -> cache.removeAll(),
+                        () -> assertEquals(0, cache.size())
+                    );
+
+                    // Remove if equals operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> cache.remove(0, 1),
+                        () -> assertEquals(0, cache.get(0))
+                    );
+
+                    // Remove operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> cache.remove(0),
+                        () -> assertFalse(cache.containsKey(0))
+                    );
+
+                    // Remove keys operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.putAll(F.asMap(0, 0, 1, 1)),
+                        () -> cache.removeAll(new TreeSet<>(F.asList(0, 1))),
+                        () -> assertFalse(cache.containsKeys(new TreeSet<>(F.asList(0, 1))))
+                    );
+
+                    // Replace if equals operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> cache.replace(0, 0, 1),
+                        () -> assertEquals(1, cache.get(0))
+                    );
+
+                    // Replace operation.
+                    checkOpMultithreaded(client,
+                        () -> cache.put(0, 0),
+                        () -> cache.replace(0, 1),
+                        () -> assertEquals(1, cache.get(0))
+                    );
+                }
+            }
+        }
+    }
+
+    /** */
+    private void checkOpMultithreaded(IgniteClient client, Runnable init, Runnable op, Runnable check) throws Exception {
+        if (init != null)
+            init.run();
+
+        GridTestUtils.runMultiThreaded(() -> {
+            for (int i = 0; i < 100; i++) {
+                // Mix implicit and explicit transactions.
+                if (ThreadLocalRandom.current().nextBoolean()) {
+                    while (true) {
+                        try (ClientTransaction tx = client.transactions().txStart(txConcurrency, txIsolation, TX_TIMEOUT)) {
+                            op.run();
+
+                            try {
+                                tx.commit();
+
+                                break;
+                            }
+                            catch (Exception e) {
+                                if (!e.getMessage().contains("Failed to prepare transaction"))
+                                    throw e;
+                            }
+                        }
+                    }
+                }
+                else
+                    op.run();
+            }
+        }, THREADS_CNT, "tx-thread");
+
+        if (check != null)
+            check.run();
+    }
+
+    /**
+     * Tests transactional consistency on concurrent operations executed using async methods on server side.
+     */
+    @Test
+    public void testTransactionalConsistency() throws Exception {
+        poolSize = THREADS_CNT;
+
+        startGrids(3);
+        IgniteClient client = startClient(0, 1, 2);
+
+        ClientCache<Integer, Integer> cache = client.getOrCreateCache(new ClientCacheConfiguration()
+            .setName("test")
+            .setAtomicityMode(TRANSACTIONAL)
+            .setBackups(1)
+        );
+
+        int iterations = 1_000;
+        int keys = 10;
+
+        GridTestUtils.runMultiThreaded(() -> {
+            for (int i = 0; i < iterations; i++) {
+                try (ClientTransaction tx = client.transactions().txStart(txConcurrency, txIsolation, TX_TIMEOUT)) {
+                    int key1 = ThreadLocalRandom.current().nextInt(keys);
+                    int key2 = ThreadLocalRandom.current().nextInt(keys);
+                    int sum = ThreadLocalRandom.current().nextInt(100);
+
+                    if (key1 < key2) { // Avoid deadlocks
+                        Integer val1 = cache.get(key1);
+                        cache.put(key1, (val1 == null ? 0 : val1) - sum);
+                        Integer val2 = cache.get(key2);
+                        cache.put(key2, (val2 == null ? 0 : val2) + sum);
+                    }
+                    else {
+                        Integer val2 = cache.get(key2);
+                        cache.put(key2, (val2 == null ? 0 : val2) + sum);
+                        Integer val1 = cache.get(key1);
+                        cache.put(key1, (val1 == null ? 0 : val1) - sum);
+                    }
+
+                    if (ThreadLocalRandom.current().nextBoolean())
+                        try {
+                            tx.commit();
+                        }
+                        catch (Exception e) {
+                            if (!e.getMessage().contains("Failed to prepare transaction"))
+                                throw e;
+                        }
+                    else
+                        tx.rollback();
+                }
+            }
+        }, THREADS_CNT, "tx-thread");
+
+        int sum = 0;
+
+        for (int i = 0; i < keys; i++) {
+            Integer val = cache.get(i);
+
+            if (val != null)
+                sum += val;
+        }
+
+        assertEquals(0, sum);
+    }
+
+    /**
+     * Tests async commit future chaining with incompleted last operation async future.
+     */
+    @Test
+    public void testCommitFutureChaining() throws Exception {
+        poolSize = 1;
+
+        try (Ignite ignore = startGrid(0)) {
+            try (IgniteClient client = startClient(0)) {
+                ClientCache<Integer, Integer> cache = client.getOrCreateCache(new ClientCacheConfiguration()
+                    .setName("test")
+                    .setAtomicityMode(TRANSACTIONAL)
+                    .setBackups(1)
+                );
+
+                int iterations = 100;
+
+                GridTestUtils.runMultiThreaded(() -> {
+                    for (int i = 0; i < iterations; i++) {
+                        if (ThreadLocalRandom.current().nextBoolean()) {
+                            try (ClientTransaction tx = client.transactions().txStart(txConcurrency, txIsolation, TX_TIMEOUT)) {
+                                cache.putAsync(0, 0);
+                                tx.commit();
+                            }
+                        }
+                        else
+                            cache.put(0, 0);
+                    }
+                }, THREADS_CNT, "tx-thread");
+            }
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index 45e5593..121bd99 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -19,6 +19,7 @@
 
 import org.apache.ignite.internal.client.thin.AffinityMetricsTest;
 import org.apache.ignite.internal.client.thin.AtomicLongTest;
+import org.apache.ignite.internal.client.thin.BlockingTxOpsTest;
 import org.apache.ignite.internal.client.thin.CacheAsyncTest;
 import org.apache.ignite.internal.client.thin.CacheEntryListenersTest;
 import org.apache.ignite.internal.client.thin.ClusterApiTest;
@@ -91,7 +92,8 @@
     ThinClientEnpointsDiscoveryTest.class,
     InactiveClusterCacheRequestTest.class,
     AffinityMetricsTest.class,
-    ClusterGroupClusterRestartTest.class
+    ClusterGroupClusterRestartTest.class,
+    BlockingTxOpsTest.class,
 })
 public class ClientTestSuite {
     // No-op.