IGNITE-13444 Durable tasks are cancelled on grid deactivation, starting new tasks is prohibited - Fixes #8244.
Signed-off-by: Sergey Chugunov <sergey.chugunov@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index a774d520..73afdf9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -729,6 +729,8 @@
nodeIds
);
+ ctx.durableBackgroundTasksProcessor().onStateChange(msg);
+
if (msg.forceChangeBaselineTopology())
newState.setTransitionResult(msg.requestId(), msg.state());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
index c703df4..dfbab72 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
@@ -22,6 +22,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
@@ -34,6 +35,7 @@
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
+import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
@@ -64,6 +66,16 @@
/** Durable background tasks map. */
private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
+ /** Set of started tasks' names. */
+ private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+
+ /**
+ * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
+ *
+ * @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+ */
+ private volatile boolean forbidStartingNewTasks;
+
/**
* @param ctx Kernal context.
*/
@@ -78,22 +90,31 @@
assert durableBackgroundTasks != null;
for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
- if (!task.isCompleted())
- asyncDurableBackgroundTaskExecute(task, false);
+ if (!task.isCompleted() && startedTasks.add(task.shortName()))
+ asyncDurableBackgroundTaskExecute(task);
}
}
/**
* Creates a worker to execute single durable background task.
+ *
* @param task Task.
- * @param dropTaskIfFailed Whether to delete task from metastorage, if it has failed.
*/
- private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task, boolean dropTaskIfFailed) {
+ private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
+ @Override public void cancel() {
+ task.onCancel();
+
+ super.cancel();
+ }
+
@Override protected void body() {
try {
+ if (forbidStartingNewTasks)
+ return;
+
log.info("Executing durable background task: " + task.shortName());
task.execute(ctx);
@@ -104,11 +125,10 @@
}
catch (Throwable e) {
log.error("Could not execute durable background task: " + task.shortName(), e);
-
- if (dropTaskIfFailed)
- removeDurableBackgroundTask(task);
}
finally {
+ startedTasks.remove(task.shortName());
+
asyncDurableBackgroundTaskWorkers.remove(this);
}
}
@@ -128,8 +148,9 @@
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
- // Waiting for workers, but not cancelling them, trying to complete running tasks.
- awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, false, log);
+ forbidStartingNewTasks = true;
+
+ awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
}
/** {@inheritDoc} */
@@ -140,9 +161,23 @@
/**
* @param msg Message.
*/
- public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
- if (!msg.clusterActive())
+ public void onStateChange(ChangeGlobalStateMessage msg) {
+ if (msg.state() == ClusterState.INACTIVE) {
+ forbidStartingNewTasks = true;
+
awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
+ }
+ }
+
+ /**
+ * @param msg Message.
+ */
+ public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
+ if (msg.state() != ClusterState.INACTIVE) {
+ forbidStartingNewTasks = false;
+
+ asyncDurableBackgroundTasksExecution();
+ }
}
/** {@inheritDoc} */
@@ -267,7 +302,7 @@
if (CU.isPersistentCache(ccfg, ctx.config().getDataStorageConfiguration()))
addDurableBackgroundTask(task);
- asyncDurableBackgroundTaskExecute(task, false);
+ asyncDurableBackgroundTaskExecute(task);
}
/** {@inheritDoc} */
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
index 5c4cb04..5fdfa62 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
@@ -47,6 +47,7 @@
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings;
import org.apache.ignite.internal.cache.query.index.sorted.IndexRow;
@@ -163,19 +164,30 @@
/** */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
+ .setFailureHandler(new StopNodeFailureHandler())
.setDataStorageConfiguration(
- new DataStorageConfiguration().setDefaultDataRegionConfiguration(
- new DataRegionConfiguration()
- .setPersistenceEnabled(true)
- .setInitialSize(10 * 1024L * 1024L)
- .setMaxSize(50 * 1024L * 1024L)
- )
- .setCheckpointFrequency(Long.MAX_VALUE / 2)
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(true)
+ .setInitialSize(10 * 1024L * 1024L)
+ .setMaxSize(50 * 1024L * 1024L)
+ )
+ .setDataRegionConfigurations(
+ new DataRegionConfiguration()
+ .setName("dr1")
+ .setPersistenceEnabled(false)
+ )
+ .setCheckpointFrequency(Long.MAX_VALUE / 2)
)
.setCacheConfiguration(
new CacheConfiguration(DEFAULT_CACHE_NAME)
.setBackups(1)
+ .setSqlSchema("PUBLIC"),
+ new CacheConfiguration<Integer, Integer>("TEST")
.setSqlSchema("PUBLIC")
+ .setBackups(1)
+ .setDataRegionName("dr1")
)
.setGridLogger(testLog);
}
@@ -542,6 +554,50 @@
}
/**
+ * Test case when cluster deactivation happens with no-persistence cache. Index tree deletion task should not be
+ * started after stopping cache.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testClusterDeactivationShouldPassWithoutErrors() throws Exception {
+ IgniteEx ignite = startGrids(NODES_COUNT);
+
+ ignite.cluster().active(true);
+
+ IgniteCache<Integer, Integer> cache = ignite.cache("TEST");
+
+ query(cache, "create table TEST (id integer primary key, p integer, f integer) with " +
+ "\"DATA_REGION=dr1\"");
+
+ query(cache, "create index TEST_IDX on TEST (p)");
+
+ for (int i = 0; i < 5_000; i++)
+ query(cache, "insert into TEST (id, p, f) values (?, ?, ?)", i, i, i);
+
+ LogListener lsnr = LogListener.matches("Could not execute durable background task").build();
+ LogListener lsnr2 = LogListener.matches("Executing durable background task").build();
+ LogListener lsnr3 = LogListener.matches("Execution of durable background task completed").build();
+
+ testLog.registerAllListeners(lsnr, lsnr2, lsnr3);
+
+ ignite.cluster().active(false);
+
+ doSleep(1_000);
+
+ assertFalse(lsnr.check());
+ assertFalse(lsnr2.check());
+ assertFalse(lsnr3.check());
+
+ testLog.unregisterListener(lsnr);
+ testLog.unregisterListener(lsnr2);
+ testLog.unregisterListener(lsnr3);
+
+ for (int i = 0; i < NODES_COUNT; i++)
+ grid(i);
+ }
+
+ /**
* Tests case when long index deletion operation happens. Checkpoint should run in the middle of index deletion
* operation.
*