blob: 8c1df0ee6a5f747e8590e44a405ab4c3d668b4a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.localtask;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointWorkflow;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
import org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.cluster.ClusterState.INACTIVE;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult.complete;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult.restart;
import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessor.metaStorageKey;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/**
* Class for testing the {@link DurableBackgroundTasksProcessor}.
*/
public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setFailureHandler(new StopNodeFailureHandler())
.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
);
}
/**
* Checking the correctness of work (without saving to the MetaStorage) of the task in normal mode.
*
* @throws Exception If failed.
*/
@Test
public void testSimpleTaskExecutionWithoutMetaStorage() throws Exception {
checkSimpleTaskExecute(false);
}
/**
* Checking the correctness of work (with saving to the MetaStorage) of the task in normal mode.
*
* @throws Exception If failed.
*/
@Test
public void testSimpleTaskExecutionWithMetaStorage() throws Exception {
checkSimpleTaskExecute(true);
}
/**
* Checking the correctness of restarting the task without MetaStorage.
*
* @throws Exception If failed.
*/
@Test
public void testRestartTaskExecutionWithoutMetaStorage() throws Exception {
checkRestartTaskExecute(false);
}
/**
* Checking the correctness of restarting the task with MetaStorage.
*
* @throws Exception If failed.
*/
@Test
public void testRestartTaskExecutionWithMetaStorage() throws Exception {
checkRestartTaskExecute(true);
}
/**
* Checking the correctness of cancelling the task.
*
* @throws Exception If failed.
*/
@Test
public void testCancelTaskExecution() throws Exception {
IgniteEx n = startGrid(0);
n.cluster().state(ACTIVE);
SimpleTask t = new SimpleTask("t");
IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, false);
t.onExecFut.get(getTestTimeout());
checkStateAndMetaStorage(n, t, STARTED, false, false);
assertFalse(execAsyncFut.isDone());
n.cluster().state(INACTIVE);
t.onExecFut.get(getTestTimeout());
t.taskFut.onDone(complete(null));
execAsyncFut.get(getTestTimeout());
}
/**
* Check that the task can be started on the node if it joined after the cluster activation.
*
* @throws Exception If failed.
*/
@Test
public void testTaskStartOnNodeJoinedAfterActivation() throws Exception {
IgniteEx n = startGrid(0);
startGrid(1);
n.cluster().state(ACTIVE);
stopGrid(1);
n = startGrid(1);
SimpleTask t = new SimpleTask("t");
execAsync(n, t, true);
t.onExecFut.get(getTestTimeout());
checkStateAndMetaStorage(n, t, STARTED, true, false);
stopGrid(1);
n = startGrid(1);
checkStateAndMetaStorage(n, t, STARTED, true, false);
n.cluster().state(INACTIVE);
stopGrid(1);
n = startGrid(1);
checkStateAndMetaStorage(n, t, INIT, true, false);
}
/**
* Check that the task will be restarted after restarting the node.
*
* @throws Exception If failed.
*/
@Test
public void testRestartTaskAfterRestartNode() throws Exception {
IgniteEx n = startGrid(0);
n.cluster().state(ACTIVE);
SimpleTask t = new SimpleTask("t");
execAsync(n, t, true);
t.onExecFut.get(getTestTimeout());
checkStateAndMetaStorage(n, t, STARTED, true, false);
t.taskFut.onDone(restart(null));
stopAllGrids();
n = startGrid(0);
n.cluster().state(ACTIVE);
t = ((SimpleTask)tasks(n).get(t.name()).task());
t.onExecFut.get(getTestTimeout());
checkStateAndMetaStorage(n, t, STARTED, true, false);
t.taskFut.onDone(complete(null));
}
/**
* Checks that stopping the node does not fail the node when deleting tasks.
*
* @throws Exception If failed.
*/
@Test
public void testNotFailNodeWhenNodeStoppindAndDeleteTasks() throws Exception {
IgniteEx n = startGrid(0);
ObservingCheckpointListener observingCpLsnr = new ObservingCheckpointListener();
dbMgr(n).addCheckpointListener(observingCpLsnr);
n.cluster().state(ACTIVE);
CheckpointWorkflow cpWorkflow = checkpointWorkflow(n);
List<CheckpointListener> cpLs = cpWorkflow.getRelevantCheckpointListeners(dbMgr(n).checkpointedDataRegions());
assertTrue(cpLs.contains(observingCpLsnr));
assertTrue(cpLs.contains(durableBackgroundTask(n)));
assertTrue(cpLs.indexOf(observingCpLsnr) < cpLs.indexOf(durableBackgroundTask(n)));
SimpleTask simpleTask = new SimpleTask("t");
IgniteInternalFuture<Void> taskFut = durableBackgroundTask(n).executeAsync(simpleTask, true);
simpleTask.onExecFut.get(getTestTimeout());
GridFutureAdapter<Void> startStopFut = new GridFutureAdapter<>();
GridFutureAdapter<Void> finishStopFut = new GridFutureAdapter<>();
observingCpLsnr.repeatOnMarkCheckpointBeginConsumer = true;
observingCpLsnr.onMarkCheckpointBeginConsumer = ctx -> {
if (n.context().isStopping()) {
startStopFut.onDone();
finishStopFut.get(getTestTimeout());
observingCpLsnr.repeatOnMarkCheckpointBeginConsumer = false;
}
};
IgniteInternalFuture<Void> stopFut = runAsync(() -> stopAllGrids(false));
startStopFut.get(getTestTimeout());
simpleTask.taskFut.onDone(DurableBackgroundTaskResult.complete(null));
taskFut.get(getTestTimeout());
finishStopFut.onDone();
stopFut.get(getTestTimeout());
assertNull(n.context().failure().failureContext());
assertThrows(log, () -> durableBackgroundTask(n).executeAsync(simpleTask, true), IgniteException.class, null);
}
/**
* Check that the task will not be deleted from the MetaStorage if it was restarted.
*
* @throws Exception If failed.
*/
@Test
public void testDontDeleteTaskIfItsRestart() throws Exception {
IgniteEx n = startGrid(0);
ObservingCheckpointListener observingCpLsnr = new ObservingCheckpointListener();
dbMgr(n).addCheckpointListener(observingCpLsnr);
n.cluster().state(ACTIVE);
CheckpointWorkflow cpWorkflow = checkpointWorkflow(n);
List<CheckpointListener> cpLs = cpWorkflow.getRelevantCheckpointListeners(dbMgr(n).checkpointedDataRegions());
assertTrue(cpLs.contains(observingCpLsnr));
assertTrue(cpLs.contains(durableBackgroundTask(n)));
assertTrue(cpLs.indexOf(observingCpLsnr) < cpLs.indexOf(durableBackgroundTask(n)));
SimpleTask simpleTask0 = new SimpleTask("t");
IgniteInternalFuture<Void> taskFut = durableBackgroundTask(n).executeAsync(simpleTask0, true);
simpleTask0.onExecFut.get(getTestTimeout());
forceCheckpoint();
dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
simpleTask0.taskFut.onDone(DurableBackgroundTaskResult.complete(null));
taskFut.get(getTestTimeout());
SimpleTask simpleTask1 = new SimpleTask("t");
AtomicReference<IgniteInternalFuture<Void>> taskFutRef = new AtomicReference<>();
observingCpLsnr.afterCheckpointEndConsumer =
ctx -> taskFutRef.set(durableBackgroundTask(n).executeAsync(simpleTask1, true));
dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
forceCheckpoint();
assertNotNull(metaStorageOperation(n, ms -> ms.read(metaStorageKey(simpleTask0))));
simpleTask1.onExecFut.get(getTestTimeout());
simpleTask1.taskFut.onDone(DurableBackgroundTaskResult.complete(null));
taskFutRef.get().get(getTestTimeout());
forceCheckpoint();
assertNull(metaStorageOperation(n, ms -> ms.read(metaStorageKey(simpleTask1))));
}
/**
* Checking the correctness of using the {@link DurableBackgroundTask#convertAfterRestoreIfNeeded}.
*
* @throws Exception If failed.
*/
@Test
public void testConvertAfterRestoreIfNeeded() throws Exception {
IgniteEx n = startGrid(0);
n.cluster().state(ACTIVE);
String taskName0 = "test-task0";
String taskName1 = "test-task1";
ConvertibleTask t0 = new ConvertibleTask(taskName0);
SimpleTask t1 = new SimpleTask(taskName1);
SimpleTask t2 = (SimpleTask)t0.convertAfterRestoreIfNeeded();
assertEquals("converted-task-" + taskName0, t2.name());
assertNotNull(n.context().durableBackgroundTask().executeAsync(t0, true));
assertNotNull(n.context().durableBackgroundTask().executeAsync(t1, true));
assertEquals(2, tasks(n).size());
checkStateAndMetaStorage(n, t0, STARTED, true, false);
checkStateAndMetaStorage(n, t1, STARTED, true, false);
stopGrid(0);
n = startGrid(0);
assertEquals(3, tasks(n).size());
checkStateAndMetaStorage(n, t0, COMPLETED, true, true, false);
checkStateAndMetaStorage(n, t1, INIT, true, false, false);
n.cluster().state(ACTIVE);
checkStateAndMetaStorage(n, t2, STARTED, true, false, true);
}
/**
* Check that the task will be restarted correctly.
*
* @param save Save to MetaStorage.
* @throws Exception If failed.
*/
private void checkRestartTaskExecute(boolean save) throws Exception {
IgniteEx n = startGrid(0);
n.cluster().state(ACTIVE);
SimpleTask t = new SimpleTask("t");
IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, save);
t.onExecFut.get(getTestTimeout());
checkStateAndMetaStorage(n, t, STARTED, save, false);
assertFalse(execAsyncFut.isDone());
if (save) {
ObservingCheckpointListener checkpointLsnr = new ObservingCheckpointListener();
dbMgr(n).addCheckpointListener(checkpointLsnr);
forceCheckpoint();
dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
t.taskFut.onDone(restart(null));
checkStateAndMetaStorage(n, t, INIT, true, false);
assertFalse(execAsyncFut.isDone());
GridFutureAdapter<Void> onMarkCheckpointBeginFut = checkpointLsnr.onMarkCheckpointBeginAsync(ctx -> {
checkStateAndMetaStorage(n, t, INIT, true, false);
assertFalse(toRmv(n).containsKey(t.name()));
});
dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
onMarkCheckpointBeginFut.get(getTestTimeout());
}
else {
t.taskFut.onDone(restart(null));
checkStateAndMetaStorage(n, t, INIT, false, false);
assertFalse(execAsyncFut.isDone());
}
t.reset();
n.cluster().state(INACTIVE);
n.cluster().state(ACTIVE);
t.onExecFut.get(getTestTimeout());
checkStateAndMetaStorage(n, t, STARTED, save, false);
assertFalse(execAsyncFut.isDone());
t.taskFut.onDone(complete(null));
execAsyncFut.get(getTestTimeout());
}
/**
* Checking that the task will be processed correctly in the normal mode.
*
* @param save Save to MetaStorage.
* @throws Exception If failed.
*/
private void checkSimpleTaskExecute(boolean save) throws Exception {
IgniteEx n = startGrid(0);
SimpleTask t = new SimpleTask("t");
IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, save);
checkStateAndMetaStorage(n, t, INIT, save, false);
checkExecuteSameTask(n, t);
checkStateAndMetaStorage(n, t, INIT, save, false);
assertFalse(t.onExecFut.isDone());
assertFalse(execAsyncFut.isDone());
n.cluster().state(ACTIVE);
t.onExecFut.get(getTestTimeout());
checkStateAndMetaStorage(n, t, STARTED, save, false);
assertFalse(execAsyncFut.isDone());
if (save) {
forceCheckpoint();
dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
t.taskFut.onDone(complete(null));
execAsyncFut.get(getTestTimeout());
checkStateAndMetaStorage(n, t, COMPLETED, true, true);
ObservingCheckpointListener checkpointLsnr = new ObservingCheckpointListener();
GridFutureAdapter<Void> onMarkCheckpointBeginFut = checkpointLsnr.onMarkCheckpointBeginAsync(
ctx -> {
checkStateAndMetaStorage(n, t, null, true, false);
assertTrue(toRmv(n).containsKey(t.name()));
}
);
GridFutureAdapter<Void> afterCheckpointEndFut = checkpointLsnr.afterCheckpointEndAsync(
ctx -> {
checkStateAndMetaStorage(n, t, null, false, false);
assertFalse(toRmv(n).containsKey(t.name()));
}
);
dbMgr(n).addCheckpointListener(checkpointLsnr);
dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
onMarkCheckpointBeginFut.get(getTestTimeout());
afterCheckpointEndFut.get(getTestTimeout());
}
else {
t.taskFut.onDone(complete(null));
execAsyncFut.get(getTestTimeout());
checkStateAndMetaStorage(n, t, null, false, false);
}
}
/**
* Checking that until the task is completed it is impossible to add a
* task with the same {@link DurableBackgroundTask#name name}.
*
* @param n Node.
* @param t Task.
*/
private void checkExecuteSameTask(IgniteEx n, DurableBackgroundTask t) {
assertThrows(log, () -> execAsync(n, t, false), IllegalArgumentException.class, null);
assertThrows(log, () -> execAsync(n, t, true), IllegalArgumentException.class, null);
assertThrows(log, () -> execAsync(n, new SimpleTask(t.name()), false), IllegalArgumentException.class, null);
assertThrows(log, () -> execAsync(n, new SimpleTask(t.name()), true), IllegalArgumentException.class, null);
}
/**
* Checking the internal {@link DurableBackgroundTaskState state} of the task and storage in the MetaStorage.
*
* @param n Node.
* @param t Task.
* @param expState Expected state of the task, {@code null} means that the task should not be.
* @param expSaved Task is expected to be stored in MetaStorage.
* @param expDone Expect completion of the futures task.
* @throws IgniteCheckedException If failed.
*/
private void checkStateAndMetaStorage(
IgniteEx n,
DurableBackgroundTask<?> t,
@Nullable State expState,
boolean expSaved,
boolean expDone
) throws IgniteCheckedException {
checkStateAndMetaStorage(n, t, expState, expSaved, expDone, null);
}
/**
* Checking the internal {@link DurableBackgroundTaskState state} of the task and storage in the MetaStorage.
*
* @param n Node.
* @param t Task.
* @param expState Expected state of the task, {@code null} means that the task should not be.
* @param expSaved Task is expected to be stored in MetaStorage.
* @param expDone Expect completion of the futures task.
* @param expConverted Expected value of the {@link DurableBackgroundTaskState#converted()},
* {@code null} if no validation is required.
* @throws IgniteCheckedException If failed.
*/
private void checkStateAndMetaStorage(
IgniteEx n,
DurableBackgroundTask<?> t,
@Nullable State expState,
boolean expSaved,
boolean expDone,
@Nullable Boolean expConverted
) throws IgniteCheckedException {
DurableBackgroundTaskState<?> taskState = tasks(n).get(t.name());
if (expState == null)
assertNull(taskState);
else {
assertEquals(expState, taskState.state());
assertEquals(expSaved, taskState.saved());
assertEquals(expDone, taskState.outFuture().isDone());
if (expConverted != null)
assertEquals(expConverted.booleanValue(), taskState.converted());
}
DurableBackgroundTask<?> ser =
(DurableBackgroundTask<?>)metaStorageOperation(n, ms -> ms.read(metaStorageKey(t)));
if (expSaved) {
assertEquals(t.name(), ser.name());
assertTrue(t.getClass().isInstance(ser));
}
else
assertNull(ser);
}
/**
* Asynchronous execution of a durable background task.
*
* @param n Node.
* @param t Task.
* @param save Save task to MetaStorage.
* @return Task future.
*/
private <R> IgniteInternalFuture<R> execAsync(IgniteEx n, DurableBackgroundTask<R> t, boolean save) {
return durableBackgroundTask(n).executeAsync(t, save);
}
/**
* Getting {@code DurableBackgroundTasksProcessor#toRmv}.
*
* @param n Node.
* @return Map of tasks that will be removed from the MetaStorage.
*/
private Map<String, DurableBackgroundTask<?>> toRmv(IgniteEx n) {
return getFieldValue(durableBackgroundTask(n), "toRmv");
}
/**
* Getting {@code DurableBackgroundTasksProcessor#tasks}.
*
* @param n Node.
* @return Task states map.
*/
private Map<String, DurableBackgroundTaskState<?>> tasks(IgniteEx n) {
return getFieldValue(durableBackgroundTask(n), "tasks");
}
/**
* Getting durable background task processor.
*
* @param n Node.
* @return Durable background task processor.
*/
private DurableBackgroundTasksProcessor durableBackgroundTask(IgniteEx n) {
return n.context().durableBackgroundTask();
}
/**
* Getting {@code CheckpointManager#checkpointWorkflow}.
*
* @param n Node.
* @return Checkpoint workflow.
*/
private CheckpointWorkflow checkpointWorkflow(IgniteEx n) {
return getFieldValue(dbMgr(n), "checkpointManager", "checkpointWorkflow");
}
}