IGNITE-10242 Pause ongoing rebalance on cache group stopping

Signed-off-by: Pavel Kovalenko <jokserfn@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6dad367..0a0e709 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -437,9 +437,11 @@
 
                                     return;
                                 }
+                                else
+                                    U.error(log, "Unsupported message type: " + m.getClass().getName());
                             }
 
-                            U.error(log, "Unsupported message type: " + m.getClass().getName());
+                            U.warn(log, "Cache group with id=" + m.groupId() + " is stopped or absent");
                         }
                         finally {
                             leaveBusy();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index d629e94..6ac26c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -192,4 +192,14 @@
      * Dumps debug information.
      */
     public void dumpDebugInfo();
+
+    /**
+     *  Pause preloader.
+     */
+    public void pause();
+
+    /**
+     * Resume preloader.
+     */
+    public void resume();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index c5e4a81..f16305c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -181,4 +181,14 @@
     @Override public void dumpDebugInfo() {
         // No-op.
     }
+
+    /** {@inheritDoc} */
+    @Override public void pause() {
+        // No-op
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resume() {
+        // No-op
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index b49c697..375dd12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -73,6 +73,7 @@
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
@@ -2113,10 +2114,8 @@
         else {
             Map<StartCacheInfo, GridCacheContext> cacheContexts = new ConcurrentHashMap<>();
 
-            int parallelismLvl = sharedCtx.kernalContext().config().getSystemThreadPoolSize();
-
             // Reserve at least 2 threads for system operations.
-            parallelismLvl = Math.max(1, parallelismLvl - 2);
+            int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);
 
             doInParallel(
                 parallelismLvl,
@@ -2908,7 +2907,9 @@
      * @param exchActions Change requests.
      */
     private void processCacheStopRequestOnExchangeDone(ExchangeActions exchActions) {
-        // Force checkpoint if there is any cache stop request
+        // Reserve at least 2 threads for system operations.
+        int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);
+
         if (!exchActions.cacheStopRequests().isEmpty()) {
             try {
                 sharedCtx.database().waitForCheckpoint("caches stop");
@@ -2918,63 +2919,88 @@
             }
         }
 
-        for (ExchangeActions.CacheActionData action : exchActions.cacheStopRequests()) {
-            CacheGroupContext gctx = cacheGrps.get(action.descriptor().groupId());
+        List<IgniteBiTuple<CacheGroupContext, Boolean>> grpToStop = exchActions.cacheGroupsToStop().stream()
+                .filter(a -> cacheGrps.containsKey(a.descriptor().groupId()))
+                .map(a -> F.t(cacheGrps.get(a.descriptor().groupId()), a.destroy()))
+                .collect(Collectors.toList());
 
-            // Cancel all operations blocking gateway
-            if (gctx != null) {
-                final String msg = "Failed to wait for topology update, cache group is stopping.";
+        Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop = exchActions.cacheStopRequests().stream()
+                .collect(Collectors.groupingBy(action -> action.descriptor().groupId()));
 
-                // If snapshot operation in progress we must throw CacheStoppedException
-                // for correct cache proxy restart. For more details see
-                // IgniteCacheProxy.cacheException()
-                gctx.affinity().cancelFutures(new CacheStoppedException(msg));
-            }
+        try {
+            doInParallel(
+                    parallelismLvl,
+                    sharedCtx.kernalContext().getSystemExecutorService(),
+                    cachesToStop.entrySet(),
+                    cachesToStopByGrp -> {
+                        CacheGroupContext gctx = cacheGrps.get(cachesToStopByGrp.getKey());
 
-            stopGateway(action.request());
+                        if (gctx != null)
+                            gctx.preloader().pause();
 
-            sharedCtx.database().checkpointReadLock();
+                        try {
 
-            try {
-                prepareCacheStop(action.request().cacheName(), action.request().destroy());
-            }
-            finally {
-                sharedCtx.database().checkpointReadUnlock();
-            }
+                            if (gctx != null) {
+                                final String msg = "Failed to wait for topology update, cache group is stopping.";
+
+                                // If snapshot operation in progress we must throw CacheStoppedException
+                                // for correct cache proxy restart. For more details see
+                                // IgniteCacheProxy.cacheException()
+                                gctx.affinity().cancelFutures(new CacheStoppedException(msg));
+                            }
+
+                            for (ExchangeActions.CacheActionData action: cachesToStopByGrp.getValue()) {
+                                stopGateway(action.request());
+
+                                sharedCtx.database().checkpointReadLock();
+
+                                try {
+                                    prepareCacheStop(action.request().cacheName(), action.request().destroy());
+                                }
+                                finally {
+                                    sharedCtx.database().checkpointReadUnlock();
+                                }
+                            }
+                        }
+                        finally {
+                            if (gctx != null)
+                                gctx.preloader().resume();
+                        }
+
+                        return null;
+                    }
+            );
+        }
+        catch (IgniteCheckedException e) {
+            String msg = "Failed to stop caches";
+
+            log.error(msg, e);
+
+            throw new IgniteException(msg, e);
         }
 
         sharedCtx.database().checkpointReadLock();
 
         try {
             // Do not invoke checkpoint listeners for groups are going to be destroyed to prevent metadata corruption.
-            for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) {
-                Integer groupId = action.descriptor().groupId();
-                CacheGroupContext grp = cacheGrps.get(groupId);
+            grpToStop.forEach(grp -> {
+                CacheGroupContext gctx = grp.getKey();
 
-                if (grp != null && grp.persistenceEnabled() && sharedCtx.database() instanceof GridCacheDatabaseSharedManager) {
+                if (gctx != null && gctx.persistenceEnabled() && sharedCtx.database() instanceof GridCacheDatabaseSharedManager) {
                     GridCacheDatabaseSharedManager mngr = (GridCacheDatabaseSharedManager)sharedCtx.database();
-                    mngr.removeCheckpointListener((DbCheckpointListener)grp.offheap());
+                    mngr.removeCheckpointListener((DbCheckpointListener)gctx.offheap());
                 }
-            }
+            });
         }
         finally {
             sharedCtx.database().checkpointReadUnlock();
         }
 
-        List<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGroups = new ArrayList<>();
-
-        for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) {
-            Integer groupId = action.descriptor().groupId();
-
-            if (cacheGrps.containsKey(groupId)) {
-                stoppedGroups.add(F.t(cacheGrps.get(groupId), action.destroy()));
-
-                stopCacheGroup(groupId);
-            }
-        }
+        for (IgniteBiTuple<CacheGroupContext, Boolean> grp : grpToStop)
+            stopCacheGroup(grp.get1().groupId());
 
         if (!sharedCtx.kernalContext().clientNode())
-            sharedCtx.database().onCacheGroupsStopped(stoppedGroups);
+            sharedCtx.database().onCacheGroupsStopped(grpToStop);
 
         if (exchActions.deactivate())
             sharedCtx.deactivate();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index a75fae7..ddbb3b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -906,6 +906,9 @@
             try {
                 GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(entry.cacheId()) : grp.singleCacheContext();
 
+                if (cctx == null)
+                    return true;
+
                 if (cctx.isNear())
                     cctx = cctx.dhtCache().context();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 3034fb9..7e281e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -249,6 +249,9 @@
 
         CacheGroupContext grp = ctx.cache().cacheGroup(grpId);
 
+        if (grp == null)
+            return;
+
         for (CacheEntryInfoCollection col : infos().values()) {
             List<GridCacheEntryInfo> entries = col.infos();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ffc55a9..89e03a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3533,10 +3533,8 @@
      * failed to send update counter deltas to backup.
      */
     private void finalizePartitionCounters() {
-        int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
-
         // Reserve at least 2 threads for system operations.
-        parallelismLvl = Math.max(1, parallelismLvl - 2);
+        int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
 
         long time = System.currentTimeMillis();
 
@@ -3965,10 +3963,8 @@
 
         long time = System.currentTimeMillis();
 
-        int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
-
         // Reserve at least 2 threads for system operations.
-        parallelismLvl = Math.max(1, parallelismLvl - 2);
+        int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
 
         try {
             doInParallel(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index fbaa241..a2cecb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -31,6 +31,7 @@
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -415,7 +416,7 @@
 
         if (marshal) {
             // Reserve at least 2 threads for system operations.
-            int parallelismLvl = Math.max(1, ctx.kernalContext().config().getSystemThreadPoolSize() - 2);
+            int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
 
             Collection<Object> objectsToMarshall = new ArrayList<>();
 
@@ -509,7 +510,7 @@
         Collection<byte[]> objectsToUnmarshall = new ArrayList<>();
 
         // Reserve at least 2 threads for system operations.
-        int parallelismLvl = Math.max(1, ctx.kernalContext().config().getSystemThreadPoolSize() - 2);
+        int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
 
         if (partsBytes != null && parts == null)
             objectsToUnmarshall.add(partsBytes);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index c8705d0..e92a240 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -20,13 +20,16 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -42,7 +45,9 @@
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.jetbrains.annotations.Nullable;
 
@@ -80,6 +85,12 @@
     private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
 
     /** */
+    private boolean paused;
+
+    /** */
+    private Queue<GridTuple3<Integer, UUID, GridDhtPartitionSupplyMessage>> pausedDemanderQueue = new ConcurrentLinkedQueue<>();
+
+    /** */
     private boolean stopped;
 
     /**
@@ -357,7 +368,10 @@
             demandLock.readLock().lock();
 
             try {
-                demander.handleSupplyMessage(idx, id, s);
+                if (paused)
+                    pausedDemanderQueue.add(F.t(idx, id, s));
+                else
+                    demander.handleSupplyMessage(idx, id, s);
             }
             finally {
                 demandLock.readLock().unlock();
@@ -562,6 +576,41 @@
     }
 
     /** {@inheritDoc} */
+    @Override public void pause() {
+        demandLock.writeLock().lock();
+
+        try {
+            paused = true;
+        }
+        finally {
+           demandLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resume() {
+        demandLock.writeLock().lock();
+
+        try {
+            final List<GridTuple3<Integer, UUID, GridDhtPartitionSupplyMessage>> msgToProc =
+                    new ArrayList<>(pausedDemanderQueue);
+
+            pausedDemanderQueue.clear();
+
+            final GridDhtPreloader preloader = this;
+
+            ctx.kernalContext().closure().runLocalSafe(() -> msgToProc.forEach(
+                    m -> preloader.handleSupplyMessage(m.get1(), m.get2(), m.get3())
+            ), GridIoPolicy.SYSTEM_POOL);
+
+            paused = false;
+        }
+        finally {
+            demandLock.writeLock().unlock();
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public void dumpDebugInfo() {
         // No-op
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 946378d..6da5c6e 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -55,6 +55,7 @@
 import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.mxbean.IgniteStandardMXBean;
 import org.apache.ignite.internal.processors.cache.CacheClassLoaderMarker;
@@ -10604,6 +10605,71 @@
     }
 
     /**
+     * @param ctx Kernal context.
+     * @param plc IO Policy.
+     * @param reserved Thread to reserve.
+     * @return Number of available threads in executor service for {@code plc}. If {@code plc}
+     *         is invalid, return {@code 1}.
+     */
+    public static int availableThreadCount(GridKernalContext ctx, byte plc, int reserved) {
+        IgniteConfiguration cfg = ctx.config();
+
+        int parallelismLvl;
+
+        switch (plc) {
+            case GridIoPolicy.P2P_POOL:
+                parallelismLvl = cfg.getPeerClassLoadingThreadPoolSize();
+
+                break;
+
+            case GridIoPolicy.SYSTEM_POOL:
+                parallelismLvl = cfg.getSystemThreadPoolSize();
+
+                break;
+
+            case GridIoPolicy.PUBLIC_POOL:
+                parallelismLvl = cfg.getPublicThreadPoolSize();
+
+                break;
+
+            case GridIoPolicy.MANAGEMENT_POOL:
+                parallelismLvl = cfg.getManagementThreadPoolSize();
+
+                break;
+
+            case GridIoPolicy.UTILITY_CACHE_POOL:
+                parallelismLvl = cfg.getUtilityCacheThreadPoolSize();
+
+                break;
+
+            case GridIoPolicy.IGFS_POOL:
+                parallelismLvl = cfg.getIgfsThreadPoolSize();
+
+                break;
+
+            case GridIoPolicy.SERVICE_POOL:
+                parallelismLvl = cfg.getServiceThreadPoolSize();
+
+                break;
+
+            case GridIoPolicy.DATA_STREAMER_POOL:
+                parallelismLvl = cfg.getDataStreamerThreadPoolSize();
+
+                break;
+
+            case GridIoPolicy.QUERY_POOL:
+                parallelismLvl = cfg.getQueryThreadPoolSize();
+
+                break;
+
+            default:
+                parallelismLvl = -1;
+        }
+
+        return Math.max(1, parallelismLvl - reserved);
+    }
+
+    /**
      * Execute operation on data in parallel.
      *
      * @param executorSvc Service for parallel execution.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
new file mode 100644
index 0000000..97f8d45
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonAbstractTest {
+    /** */
+    private static final String CACHE_1 = "cache_1";
+
+    /** */
+    private static final String CACHE_2 = "cache_2";
+
+    /** */
+    private static final String CACHE_3 = "cache_3";
+
+    /** */
+    private static final String CACHE_4 = "cache_4";
+
+    /** */
+    private static final String GROUP_1 = "group_1";
+
+    /** */
+    private static final String GROUP_2 = "group_2";
+
+    /** */
+    private static final int REBALANCE_BATCH_SIZE = 50 * 1024;
+
+    /** */
+    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+        spi.setIpFinder(ipFinder);
+
+        cfg.setDiscoverySpi(spi);
+
+        cfg.setCommunicationSpi(new RebalanceBlockingSPI());
+
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+
+        cfg.setRebalanceThreadPoolSize(4);
+
+        cfg.setTransactionConfiguration(new TransactionConfiguration()
+            .setDefaultTxTimeout(1000));
+
+        cfg.setDataStorageConfiguration(
+                new DataStorageConfiguration()
+                        .setWalMode(WALMode.LOG_ONLY)
+                        .setDefaultDataRegionConfiguration(
+                                new DataRegionConfiguration()
+                                        .setPersistenceEnabled(true)
+                                        .setMaxSize(100L * 1024 * 1024)));
+
+        return cfg;
+    }
+
+    /**
+     *
+     */
+    public void testStopCachesOnDeactivation() throws Exception {
+        performTest(ig -> {
+            ig.cluster().active(false);
+
+            // Add to escape possible long waiting in awaitPartitionMapExchange due to {@link CacheAffinityChangeMessage}.
+            ig.cluster().active(true);
+
+            return null;
+        });
+    }
+
+    /**
+     *
+     */
+    public void testDestroySpecificCachesInDifferentCacheGroups() throws Exception {
+        performTest(ig -> {
+            ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3));
+
+            return null;
+        });
+    }
+
+    /**
+     *
+     */
+    public void testDestroySpecificCacheAndCacheGroup() throws Exception {
+        performTest(ig -> {
+            ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3, CACHE_4));
+
+            return null;
+        });
+    }
+
+    /**
+     * @param testAction Action that trigger stop or destroy of caches.
+     */
+    private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction) throws Exception {
+        IgniteEx ig0 = (IgniteEx)startGrids(2);
+
+        ig0.cluster().active(true);
+
+        stopGrid(1);
+
+        loadData(ig0);
+
+        startGrid(1);
+
+        runLoad(ig0);
+
+        testAction.accept(ig0);
+
+        U.sleep(1000);
+
+        awaitPartitionMapExchange(true, true, null, true);
+
+        assertNull(grid(1).context().failure().failureContext());
+    }
+
+    /**
+     * @param ig Ig.
+     */
+    private void loadData(Ignite ig) {
+        List<CacheConfiguration> configs = Stream.of(
+                F.t(CACHE_1, GROUP_1),
+                F.t(CACHE_2, GROUP_1),
+                F.t(CACHE_3, GROUP_2),
+                F.t(CACHE_4, GROUP_2)
+        ).map(names -> new CacheConfiguration<>(names.get1())
+                .setGroupName(names.get2())
+                .setRebalanceBatchSize(REBALANCE_BATCH_SIZE)
+                .setCacheMode(CacheMode.REPLICATED)
+        ).collect(Collectors.toList());
+
+        ig.getOrCreateCaches(configs);
+
+        configs.forEach(cfg -> {
+            try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(cfg.getName())) {
+                for (int i = 0; i < 3_000; i++)
+                    streamer.addData(i, new byte[1024]);
+
+                streamer.flush();
+            }
+        });
+    }
+
+    /**
+     * @param ig Ignite instance.
+     */
+    private void runLoad(Ignite ig) throws Exception{
+        GridTestUtils.runMultiThreaded(new Runnable() {
+            @Override public void run() {
+                String cacheName = F.rand(CACHE_1, CACHE_2, CACHE_3, CACHE_4);
+
+                IgniteCache cache = ig.cache(cacheName);
+
+                for (int i = 0; i < 3_000; i++) {
+                    int idx = ThreadLocalRandom.current().nextInt(3_000);
+
+                    cache.put(idx, new byte[1024]);
+                }
+            }
+        }, 4, "load-thread");
+    }
+
+    /**
+     *
+     */
+    private static class RebalanceBlockingSPI extends TcpCommunicationSpi {
+        /** */
+        public static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+            slowDownMessage(msg);
+
+            super.sendMessage(node, msg);
+
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg,
+                                          IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            slowDownMessage(msg);
+
+            super.sendMessage(node, msg, ackC);
+        }
+
+        /**
+         * @param msg Message.
+         */
+        private void slowDownMessage(Message msg) {
+            if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
+                int grpId = ((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId();
+
+                if (grpId == CU.cacheId(GROUP_1) || grpId == CU.cacheId(GROUP_2)) {
+                    try {
+                        U.sleep(50);
+                    }
+                    catch (IgniteInterruptedCheckedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 11f0219..d01f1ed 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -21,6 +21,7 @@
 import org.apache.ignite.cache.ResetLostPartitionTest;
 import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
 import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.IgniteRebalanceOnCachesStoppingOrDestroyingTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
 import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest;
@@ -52,6 +53,8 @@
 
         suite.addTestSuite(ResetLostPartitionTest.class);
 
+        suite.addTestSuite(IgniteRebalanceOnCachesStoppingOrDestroyingTest.class);
+
         suite.addTestSuite(CachePageWriteLockUnlockTest.class);
 
         suite.addTestSuite(IgnitePdsCacheWalDisabledOnRebalancingTest.class);