IGNITE-10242 Pause ongoing rebalance on cache group stopping
Signed-off-by: Pavel Kovalenko <jokserfn@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 6dad367..0a0e709 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -437,9 +437,11 @@
return;
}
+ else
+ U.error(log, "Unsupported message type: " + m.getClass().getName());
}
- U.error(log, "Unsupported message type: " + m.getClass().getName());
+ U.warn(log, "Cache group with id=" + m.groupId() + " is stopped or absent");
}
finally {
leaveBusy();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index d629e94..6ac26c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -192,4 +192,14 @@
* Dumps debug information.
*/
public void dumpDebugInfo();
+
+ /**
+ * Pause preloader.
+ */
+ public void pause();
+
+ /**
+ * Resume preloader.
+ */
+ public void resume();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index c5e4a81..f16305c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -181,4 +181,14 @@
@Override public void dumpDebugInfo() {
// No-op.
}
+
+ /** {@inheritDoc} */
+ @Override public void pause() {
+ // No-op
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resume() {
+ // No-op
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index b49c697..375dd12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -73,6 +73,7 @@
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMarshaller;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
@@ -2113,10 +2114,8 @@
else {
Map<StartCacheInfo, GridCacheContext> cacheContexts = new ConcurrentHashMap<>();
- int parallelismLvl = sharedCtx.kernalContext().config().getSystemThreadPoolSize();
-
// Reserve at least 2 threads for system operations.
- parallelismLvl = Math.max(1, parallelismLvl - 2);
+ int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);
doInParallel(
parallelismLvl,
@@ -2908,7 +2907,9 @@
* @param exchActions Change requests.
*/
private void processCacheStopRequestOnExchangeDone(ExchangeActions exchActions) {
- // Force checkpoint if there is any cache stop request
+ // Reserve at least 2 threads for system operations.
+ int parallelismLvl = U.availableThreadCount(ctx, GridIoPolicy.SYSTEM_POOL, 2);
+
if (!exchActions.cacheStopRequests().isEmpty()) {
try {
sharedCtx.database().waitForCheckpoint("caches stop");
@@ -2918,63 +2919,88 @@
}
}
- for (ExchangeActions.CacheActionData action : exchActions.cacheStopRequests()) {
- CacheGroupContext gctx = cacheGrps.get(action.descriptor().groupId());
+ List<IgniteBiTuple<CacheGroupContext, Boolean>> grpToStop = exchActions.cacheGroupsToStop().stream()
+ .filter(a -> cacheGrps.containsKey(a.descriptor().groupId()))
+ .map(a -> F.t(cacheGrps.get(a.descriptor().groupId()), a.destroy()))
+ .collect(Collectors.toList());
- // Cancel all operations blocking gateway
- if (gctx != null) {
- final String msg = "Failed to wait for topology update, cache group is stopping.";
+ Map<Integer, List<ExchangeActions.CacheActionData>> cachesToStop = exchActions.cacheStopRequests().stream()
+ .collect(Collectors.groupingBy(action -> action.descriptor().groupId()));
- // If snapshot operation in progress we must throw CacheStoppedException
- // for correct cache proxy restart. For more details see
- // IgniteCacheProxy.cacheException()
- gctx.affinity().cancelFutures(new CacheStoppedException(msg));
- }
+ try {
+ doInParallel(
+ parallelismLvl,
+ sharedCtx.kernalContext().getSystemExecutorService(),
+ cachesToStop.entrySet(),
+ cachesToStopByGrp -> {
+ CacheGroupContext gctx = cacheGrps.get(cachesToStopByGrp.getKey());
- stopGateway(action.request());
+ if (gctx != null)
+ gctx.preloader().pause();
- sharedCtx.database().checkpointReadLock();
+ try {
- try {
- prepareCacheStop(action.request().cacheName(), action.request().destroy());
- }
- finally {
- sharedCtx.database().checkpointReadUnlock();
- }
+ if (gctx != null) {
+ final String msg = "Failed to wait for topology update, cache group is stopping.";
+
+ // If snapshot operation in progress we must throw CacheStoppedException
+ // for correct cache proxy restart. For more details see
+ // IgniteCacheProxy.cacheException()
+ gctx.affinity().cancelFutures(new CacheStoppedException(msg));
+ }
+
+ for (ExchangeActions.CacheActionData action: cachesToStopByGrp.getValue()) {
+ stopGateway(action.request());
+
+ sharedCtx.database().checkpointReadLock();
+
+ try {
+ prepareCacheStop(action.request().cacheName(), action.request().destroy());
+ }
+ finally {
+ sharedCtx.database().checkpointReadUnlock();
+ }
+ }
+ }
+ finally {
+ if (gctx != null)
+ gctx.preloader().resume();
+ }
+
+ return null;
+ }
+ );
+ }
+ catch (IgniteCheckedException e) {
+ String msg = "Failed to stop caches";
+
+ log.error(msg, e);
+
+ throw new IgniteException(msg, e);
}
sharedCtx.database().checkpointReadLock();
try {
// Do not invoke checkpoint listeners for groups are going to be destroyed to prevent metadata corruption.
- for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) {
- Integer groupId = action.descriptor().groupId();
- CacheGroupContext grp = cacheGrps.get(groupId);
+ grpToStop.forEach(grp -> {
+ CacheGroupContext gctx = grp.getKey();
- if (grp != null && grp.persistenceEnabled() && sharedCtx.database() instanceof GridCacheDatabaseSharedManager) {
+ if (gctx != null && gctx.persistenceEnabled() && sharedCtx.database() instanceof GridCacheDatabaseSharedManager) {
GridCacheDatabaseSharedManager mngr = (GridCacheDatabaseSharedManager)sharedCtx.database();
- mngr.removeCheckpointListener((DbCheckpointListener)grp.offheap());
+ mngr.removeCheckpointListener((DbCheckpointListener)gctx.offheap());
}
- }
+ });
}
finally {
sharedCtx.database().checkpointReadUnlock();
}
- List<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGroups = new ArrayList<>();
-
- for (ExchangeActions.CacheGroupActionData action : exchActions.cacheGroupsToStop()) {
- Integer groupId = action.descriptor().groupId();
-
- if (cacheGrps.containsKey(groupId)) {
- stoppedGroups.add(F.t(cacheGrps.get(groupId), action.destroy()));
-
- stopCacheGroup(groupId);
- }
- }
+ for (IgniteBiTuple<CacheGroupContext, Boolean> grp : grpToStop)
+ stopCacheGroup(grp.get1().groupId());
if (!sharedCtx.kernalContext().clientNode())
- sharedCtx.database().onCacheGroupsStopped(stoppedGroups);
+ sharedCtx.database().onCacheGroupsStopped(grpToStop);
if (exchActions.deactivate())
sharedCtx.deactivate();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index a75fae7..ddbb3b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -906,6 +906,9 @@
try {
GridCacheContext cctx = grp.sharedGroup() ? ctx.cacheContext(entry.cacheId()) : grp.singleCacheContext();
+ if (cctx == null)
+ return true;
+
if (cctx.isNear())
cctx = cctx.dhtCache().context();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
index 3034fb9..7e281e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplyMessage.java
@@ -249,6 +249,9 @@
CacheGroupContext grp = ctx.cache().cacheGroup(grpId);
+ if (grp == null)
+ return;
+
for (CacheEntryInfoCollection col : infos().values()) {
List<GridCacheEntryInfo> entries = col.infos();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index ffc55a9..89e03a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -3533,10 +3533,8 @@
* failed to send update counter deltas to backup.
*/
private void finalizePartitionCounters() {
- int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
-
// Reserve at least 2 threads for system operations.
- parallelismLvl = Math.max(1, parallelismLvl - 2);
+ int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
long time = System.currentTimeMillis();
@@ -3965,10 +3963,8 @@
long time = System.currentTimeMillis();
- int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
-
// Reserve at least 2 threads for system operations.
- parallelismLvl = Math.max(1, parallelismLvl - 2);
+ int parallelismLvl = U.availableThreadCount(cctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
try {
doInParallel(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index fbaa241..a2cecb8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -31,6 +31,7 @@
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -415,7 +416,7 @@
if (marshal) {
// Reserve at least 2 threads for system operations.
- int parallelismLvl = Math.max(1, ctx.kernalContext().config().getSystemThreadPoolSize() - 2);
+ int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
Collection<Object> objectsToMarshall = new ArrayList<>();
@@ -509,7 +510,7 @@
Collection<byte[]> objectsToUnmarshall = new ArrayList<>();
// Reserve at least 2 threads for system operations.
- int parallelismLvl = Math.max(1, ctx.kernalContext().config().getSystemThreadPoolSize() - 2);
+ int parallelismLvl = U.availableThreadCount(ctx.kernalContext(), GridIoPolicy.SYSTEM_POOL, 2);
if (partsBytes != null && parts == null)
objectsToUnmarshall.add(partsBytes);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index c8705d0..e92a240 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -20,13 +20,16 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
@@ -42,7 +45,9 @@
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
+import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgnitePredicate;
import org.jetbrains.annotations.Nullable;
@@ -80,6 +85,12 @@
private final ReadWriteLock demandLock = new ReentrantReadWriteLock();
/** */
+ private boolean paused;
+
+ /** */
+ private Queue<GridTuple3<Integer, UUID, GridDhtPartitionSupplyMessage>> pausedDemanderQueue = new ConcurrentLinkedQueue<>();
+
+ /** */
private boolean stopped;
/**
@@ -357,7 +368,10 @@
demandLock.readLock().lock();
try {
- demander.handleSupplyMessage(idx, id, s);
+ if (paused)
+ pausedDemanderQueue.add(F.t(idx, id, s));
+ else
+ demander.handleSupplyMessage(idx, id, s);
}
finally {
demandLock.readLock().unlock();
@@ -562,6 +576,41 @@
}
/** {@inheritDoc} */
+ @Override public void pause() {
+ demandLock.writeLock().lock();
+
+ try {
+ paused = true;
+ }
+ finally {
+ demandLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resume() {
+ demandLock.writeLock().lock();
+
+ try {
+ final List<GridTuple3<Integer, UUID, GridDhtPartitionSupplyMessage>> msgToProc =
+ new ArrayList<>(pausedDemanderQueue);
+
+ pausedDemanderQueue.clear();
+
+ final GridDhtPreloader preloader = this;
+
+ ctx.kernalContext().closure().runLocalSafe(() -> msgToProc.forEach(
+ m -> preloader.handleSupplyMessage(m.get1(), m.get2(), m.get3())
+ ), GridIoPolicy.SYSTEM_POOL);
+
+ paused = false;
+ }
+ finally {
+ demandLock.writeLock().unlock();
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public void dumpDebugInfo() {
// No-op
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 946378d..6da5c6e 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -55,6 +55,7 @@
import org.apache.ignite.internal.compute.ComputeTaskTimeoutCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
import org.apache.ignite.internal.mxbean.IgniteStandardMXBean;
import org.apache.ignite.internal.processors.cache.CacheClassLoaderMarker;
@@ -10604,6 +10605,71 @@
}
/**
+ * @param ctx Kernal context.
+ * @param plc IO Policy.
+ * @param reserved Thread to reserve.
+ * @return Number of available threads in executor service for {@code plc}. If {@code plc}
+ * is invalid, return {@code 1}.
+ */
+ public static int availableThreadCount(GridKernalContext ctx, byte plc, int reserved) {
+ IgniteConfiguration cfg = ctx.config();
+
+ int parallelismLvl;
+
+ switch (plc) {
+ case GridIoPolicy.P2P_POOL:
+ parallelismLvl = cfg.getPeerClassLoadingThreadPoolSize();
+
+ break;
+
+ case GridIoPolicy.SYSTEM_POOL:
+ parallelismLvl = cfg.getSystemThreadPoolSize();
+
+ break;
+
+ case GridIoPolicy.PUBLIC_POOL:
+ parallelismLvl = cfg.getPublicThreadPoolSize();
+
+ break;
+
+ case GridIoPolicy.MANAGEMENT_POOL:
+ parallelismLvl = cfg.getManagementThreadPoolSize();
+
+ break;
+
+ case GridIoPolicy.UTILITY_CACHE_POOL:
+ parallelismLvl = cfg.getUtilityCacheThreadPoolSize();
+
+ break;
+
+ case GridIoPolicy.IGFS_POOL:
+ parallelismLvl = cfg.getIgfsThreadPoolSize();
+
+ break;
+
+ case GridIoPolicy.SERVICE_POOL:
+ parallelismLvl = cfg.getServiceThreadPoolSize();
+
+ break;
+
+ case GridIoPolicy.DATA_STREAMER_POOL:
+ parallelismLvl = cfg.getDataStreamerThreadPoolSize();
+
+ break;
+
+ case GridIoPolicy.QUERY_POOL:
+ parallelismLvl = cfg.getQueryThreadPoolSize();
+
+ break;
+
+ default:
+ parallelismLvl = -1;
+ }
+
+ return Math.max(1, parallelismLvl - reserved);
+ }
+
+ /**
* Execute operation on data in parallel.
*
* @param executorSvc Service for parallel execution.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
new file mode 100644
index 0000000..97f8d45
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/rebalancing/IgniteRebalanceOnCachesStoppingOrDestroyingTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteRebalanceOnCachesStoppingOrDestroyingTest extends GridCommonAbstractTest {
+ /** */
+ private static final String CACHE_1 = "cache_1";
+
+ /** */
+ private static final String CACHE_2 = "cache_2";
+
+ /** */
+ private static final String CACHE_3 = "cache_3";
+
+ /** */
+ private static final String CACHE_4 = "cache_4";
+
+ /** */
+ private static final String GROUP_1 = "group_1";
+
+ /** */
+ private static final String GROUP_2 = "group_2";
+
+ /** */
+ private static final int REBALANCE_BATCH_SIZE = 50 * 1024;
+
+ /** */
+ private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TcpDiscoverySpi spi = new TcpDiscoverySpi();
+
+ spi.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(spi);
+
+ cfg.setCommunicationSpi(new RebalanceBlockingSPI());
+
+ cfg.setFailureHandler(new StopNodeFailureHandler());
+
+ cfg.setRebalanceThreadPoolSize(4);
+
+ cfg.setTransactionConfiguration(new TransactionConfiguration()
+ .setDefaultTxTimeout(1000));
+
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setWalMode(WALMode.LOG_ONLY)
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setMaxSize(100L * 1024 * 1024)));
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ public void testStopCachesOnDeactivation() throws Exception {
+ performTest(ig -> {
+ ig.cluster().active(false);
+
+ // Add to escape possible long waiting in awaitPartitionMapExchange due to {@link CacheAffinityChangeMessage}.
+ ig.cluster().active(true);
+
+ return null;
+ });
+ }
+
+ /**
+ *
+ */
+ public void testDestroySpecificCachesInDifferentCacheGroups() throws Exception {
+ performTest(ig -> {
+ ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3));
+
+ return null;
+ });
+ }
+
+ /**
+ *
+ */
+ public void testDestroySpecificCacheAndCacheGroup() throws Exception {
+ performTest(ig -> {
+ ig.destroyCaches(Arrays.asList(CACHE_1, CACHE_3, CACHE_4));
+
+ return null;
+ });
+ }
+
+ /**
+ * @param testAction Action that trigger stop or destroy of caches.
+ */
+ private void performTest(IgniteThrowableConsumer<Ignite, Void> testAction) throws Exception {
+ IgniteEx ig0 = (IgniteEx)startGrids(2);
+
+ ig0.cluster().active(true);
+
+ stopGrid(1);
+
+ loadData(ig0);
+
+ startGrid(1);
+
+ runLoad(ig0);
+
+ testAction.accept(ig0);
+
+ U.sleep(1000);
+
+ awaitPartitionMapExchange(true, true, null, true);
+
+ assertNull(grid(1).context().failure().failureContext());
+ }
+
+ /**
+ * @param ig Ig.
+ */
+ private void loadData(Ignite ig) {
+ List<CacheConfiguration> configs = Stream.of(
+ F.t(CACHE_1, GROUP_1),
+ F.t(CACHE_2, GROUP_1),
+ F.t(CACHE_3, GROUP_2),
+ F.t(CACHE_4, GROUP_2)
+ ).map(names -> new CacheConfiguration<>(names.get1())
+ .setGroupName(names.get2())
+ .setRebalanceBatchSize(REBALANCE_BATCH_SIZE)
+ .setCacheMode(CacheMode.REPLICATED)
+ ).collect(Collectors.toList());
+
+ ig.getOrCreateCaches(configs);
+
+ configs.forEach(cfg -> {
+ try (IgniteDataStreamer<Object, Object> streamer = ig.dataStreamer(cfg.getName())) {
+ for (int i = 0; i < 3_000; i++)
+ streamer.addData(i, new byte[1024]);
+
+ streamer.flush();
+ }
+ });
+ }
+
+ /**
+ * @param ig Ignite instance.
+ */
+ private void runLoad(Ignite ig) throws Exception{
+ GridTestUtils.runMultiThreaded(new Runnable() {
+ @Override public void run() {
+ String cacheName = F.rand(CACHE_1, CACHE_2, CACHE_3, CACHE_4);
+
+ IgniteCache cache = ig.cache(cacheName);
+
+ for (int i = 0; i < 3_000; i++) {
+ int idx = ThreadLocalRandom.current().nextInt(3_000);
+
+ cache.put(idx, new byte[1024]);
+ }
+ }
+ }, 4, "load-thread");
+ }
+
+ /**
+ *
+ */
+ private static class RebalanceBlockingSPI extends TcpCommunicationSpi {
+ /** */
+ public static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg) throws IgniteSpiException {
+ slowDownMessage(msg);
+
+ super.sendMessage(node, msg);
+
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
+ IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+ slowDownMessage(msg);
+
+ super.sendMessage(node, msg, ackC);
+ }
+
+ /**
+ * @param msg Message.
+ */
+ private void slowDownMessage(Message msg) {
+ if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() instanceof GridDhtPartitionSupplyMessage) {
+ int grpId = ((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId();
+
+ if (grpId == CU.cacheId(GROUP_1) || grpId == CU.cacheId(GROUP_2)) {
+ try {
+ U.sleep(50);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
index 11f0219..d01f1ed 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite4.java
@@ -21,6 +21,7 @@
import org.apache.ignite.cache.ResetLostPartitionTest;
import org.apache.ignite.internal.processors.cache.IgniteClusterActivateDeactivateTestWithPersistenceAndMemoryReuse;
import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest;
+import org.apache.ignite.internal.processors.cache.distributed.rebalancing.IgniteRebalanceOnCachesStoppingOrDestroyingTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTestWithSharedGroupAndIndexes;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTaskCancelingTest;
@@ -52,6 +53,8 @@
suite.addTestSuite(ResetLostPartitionTest.class);
+ suite.addTestSuite(IgniteRebalanceOnCachesStoppingOrDestroyingTest.class);
+
suite.addTestSuite(CachePageWriteLockUnlockTest.class);
suite.addTestSuite(IgnitePdsCacheWalDisabledOnRebalancingTest.class);