diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
index 4e23c11..00cb35e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/DurableBackgroundCleanupIndexTreeTask.java
@@ -22,7 +22,9 @@
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cache.query.index.IndexName;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexKeyType;
 import org.apache.ignite.internal.cache.query.index.sorted.inline.InlineIndexTree;
@@ -36,11 +38,18 @@
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
 import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIoResolver;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.thread.IgniteThread;
+import org.jetbrains.annotations.Nullable;
 
+import static java.util.Collections.singleton;
 import static org.apache.ignite.internal.metric.IoStatisticsType.SORTED_INDEX;
 
 /**
@@ -54,10 +63,7 @@
     private List<Long> rootPages;
 
     /** */
-    private transient List<InlineIndexTree> trees;
-
-    /** */
-    private transient volatile boolean completed;
+    private transient volatile List<InlineIndexTree> trees;
 
     /** */
     private String cacheGrpName;
@@ -77,6 +83,12 @@
     /** */
     private final String id;
 
+    /** Logger. */
+    @Nullable private transient volatile IgniteLogger log;
+
+    /** Worker tasks. */
+    @Nullable private transient volatile GridWorker worker;
+
     /** */
     public DurableBackgroundCleanupIndexTreeTask(
         List<Long> rootPages,
@@ -88,7 +100,6 @@
     ) {
         this.rootPages = rootPages;
         this.trees = trees;
-        this.completed = false;
         this.cacheGrpName = cacheGrpName;
         this.cacheName = cacheName;
         this.id = UUID.randomUUID().toString();
@@ -98,12 +109,51 @@
     }
 
     /** {@inheritDoc} */
-    @Override public String shortName() {
+    @Override public String name() {
         return "DROP_SQL_INDEX-" + schemaName + "." + idxName + "-" + id;
     }
 
     /** {@inheritDoc} */
-    @Override public void execute(GridKernalContext ctx) {
+    @Override public IgniteInternalFuture<DurableBackgroundTaskResult> executeAsync(GridKernalContext ctx) {
+        log = ctx.log(this.getClass());
+
+        assert worker == null;
+
+        GridFutureAdapter<DurableBackgroundTaskResult> fut = new GridFutureAdapter<>();
+
+        worker = new GridWorker(
+            ctx.igniteInstanceName(),
+            "async-durable-background-task-executor-" + name(),
+            log
+        ) {
+            /** {@inheritDoc} */
+            @Override protected void body() {
+                try {
+                    execute(ctx);
+
+                    worker = null;
+
+                    fut.onDone(DurableBackgroundTaskResult.complete(null));
+                }
+                catch (Throwable t) {
+                    worker = null;
+
+                    fut.onDone(DurableBackgroundTaskResult.restart(t));
+                }
+            }
+        };
+
+        new IgniteThread(worker).start();
+
+        return fut;
+    }
+
+    /**
+     * Task execution.
+     *
+     * @param ctx Kernal context.
+     */
+    private void execute(GridKernalContext ctx) {
         List<InlineIndexTree> trees0 = trees;
 
         if (trees0 == null) {
@@ -173,7 +223,7 @@
                 // Below we create a fake index tree using it's root page, stubbing some parameters,
                 // because we just going to free memory pages that are occupied by tree structure.
                 try {
-                    String treeName = "deletedTree_" + i + "_" + shortName();
+                    String treeName = "deletedTree_" + i + "_" + name();
 
                     InlineIndexTree tree = new InlineIndexTree(
                         null, cctx, treeName, cctx.offheap(), cctx.offheap().reuseListForIndex(treeName),
@@ -226,7 +276,8 @@
                     return pageAddr == 0;
                 }
                 finally {
-                    pageMem.readUnlock(grpId, rootPageId, page);
+                    if (pageAddr != 0)
+                        pageMem.readUnlock(grpId, rootPageId, page);
                 }
             }
             finally {
@@ -239,18 +290,16 @@
     }
 
     /** {@inheritDoc} */
-    @Override public void complete() {
-        completed = true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isCompleted() {
-        return completed;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onCancel() {
+    @Override public void cancel() {
         trees = null;
+
+        GridWorker w = worker;
+
+        if (w != null) {
+            worker = null;
+
+            U.awaitForWorkersStop(singleton(w), true, log);
+        }
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
index 9dd6022..841ef0d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cache/query/index/sorted/inline/InlineIndexImpl.java
@@ -449,7 +449,7 @@
                     treeName
                 );
 
-                cctx.kernalContext().durableBackgroundTasksProcessor().startDurableBackgroundTask(task, cctx.config());
+                cctx.kernalContext().durableBackgroundTasksProcessor().executeAsync(task, cctx.config());
             }
         }
         catch (IgniteCheckedException e) {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
index 355f624..743f84c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
@@ -18,41 +18,32 @@
 
 import java.io.Serializable;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
 
 /**
- * Durable task that should be used to do long operations (e.g. index deletion) in background
- * for cases when node with persistence can fail before operation is completed. After start, node reads it's
- * pending background tasks from metastorage and completes them.
+ * Durable task that should be used to do long operations (e.g. index deletion) in background.
  */
 public interface DurableBackgroundTask extends Serializable {
     /**
-     * Short unique name of the task is used to build metastorage key for saving this task and for logging.
+     * Getting the name of the task to identify it.
+     * Also used as part of a key for storage in a MetaStorage.
      *
-     * @return Short name of this task.
+     * @return Task name.
      */
-    public String shortName();
+    String name();
 
     /**
-     * Method that executes the task. It is called after node start.
+     * Canceling the task.
+     */
+    void cancel();
+
+    /**
+     * Asynchronous task execution.
      *
-     * @param ctx Grid kernal context.
-     */
-    public void execute(GridKernalContext ctx);
-
-    /**
-     * Method that marks task as complete.
-     */
-    public void complete();
-
-    /**
-     * Method that return completion flag.
+     * Completion of the task execution should be only with the {@link DurableBackgroundTaskResult result}.
      *
-     * @return flag that task completed.
+     * @param ctx Kernal context.
+     * @return Future of the tasks.
      */
-    public boolean isCompleted();
-
-    /**
-     * Callback for task cancellation.
-     */
-    public void onCancel();
+    IgniteInternalFuture<DurableBackgroundTaskResult> executeAsync(GridKernalContext ctx);
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTaskResult.java
new file mode 100644
index 0000000..5d82e60
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTaskResult.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of executing a durable background task.
+ * <p/>
+ * There may be the following states:
+ * <ul>
+ *   <li>{@link #completed Completed} - the task has completed its execution and should be deleted.</li>
+ *   <li>{@link #restart Restart} - the task has not yet completed its execution and must be restarted.</li>
+ * </ul>
+ */
+public class DurableBackgroundTaskResult {
+    /** Completed state. */
+    private static final Object COMPLETED = new Object();
+
+    /** Restarted state. */
+    private static final Object RESTART = new Object();
+
+    /** Execution state. */
+    private final Object state;
+
+    /** An error occurred while executing the task. */
+    @Nullable private final Throwable err;
+
+    /**
+     * Constructor.
+     *
+     * @param res Execution state.
+     * @param err An error occurred while executing the task.
+     */
+    private DurableBackgroundTaskResult(Object res, @Nullable Throwable err) {
+        this.state = res;
+        this.err = err;
+    }
+
+    /**
+     * Creation of a completed task execution result that does not require restarting it.
+     *
+     * @param err An error occurred while executing the task.
+     * @return Result of executing a durable background task.
+     */
+    public static DurableBackgroundTaskResult complete(@Nullable Throwable err) {
+        return new DurableBackgroundTaskResult(COMPLETED, err);
+    }
+
+    /**
+     * Creation of a task execution result that requires its restart.
+     *
+     * @param err An error occurred while executing the task.
+     * @return Result of executing a durable background task.
+     */
+    public static DurableBackgroundTaskResult restart(@Nullable Throwable err) {
+        return new DurableBackgroundTaskResult(RESTART, err);
+    }
+
+    /**
+     * Checking the completion of the task.
+     *
+     * @return {@code True} if completed.
+     */
+    public boolean completed() {
+        return state == COMPLETED;
+    }
+
+    /**
+     * Checking if the task needs to be restarted.
+     *
+     * @return {@code True} if the task needs to be restarted.
+     */
+    public boolean restart() {
+        return state == RESTART;
+    }
+
+    /**
+     * Getting a task execution error.
+     *
+     * @return An error occurred while executing the task.
+     */
+    @Nullable public Throwable error() {
+        return err;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DurableBackgroundTaskResult.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index 73afdf9..a3d1fdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -729,7 +729,7 @@
                     nodeIds
                 );
 
-                ctx.durableBackgroundTasksProcessor().onStateChange(msg);
+                ctx.durableBackgroundTasksProcessor().onStateChangeStarted(msg);
 
                 if (msg.forceChangeBaselineTopology())
                     newState.setTransitionResult(msg.requestId(), msg.state());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTaskState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTaskState.java
new file mode 100644
index 0000000..32ab7b9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTaskState.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.localtask;
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+
+/**
+ * Class for storing the current state of a durable background task.
+ *
+ * Task execution state transitions:
+ * INIT -> PREPARE -> STARTED -> COMPLETED
+ *
+ * If the task needs to be restarted, it must have status INIT.
+ */
+public class DurableBackgroundTaskState {
+    /**
+     * Enumeration of the current state of the task.
+     */
+    public enum State {
+        /** Initial state. */
+        INIT,
+
+        /** Preparation for execution state. */
+        PREPARE,
+
+        /** Execution state. */
+        STARTED,
+
+        /** Completion state. */
+        COMPLETED
+    }
+
+    /** Current state atomic updater. */
+    private static final AtomicReferenceFieldUpdater<DurableBackgroundTaskState, State> STATE_UPDATER =
+        AtomicReferenceFieldUpdater.newUpdater(DurableBackgroundTaskState.class, State.class, "state");
+
+    /** Durable background task. */
+    private final DurableBackgroundTask task;
+
+    /** Outside task future. */
+    @Nullable private final GridFutureAdapter<Void> outFut;
+
+    /** Task has been saved to the MetaStorage. */
+    private final boolean saved;
+
+    /** Current state of the task. */
+    private volatile State state = INIT;
+
+    /**
+     * Constructor.
+     *
+     * @param task   Durable background task.
+     * @param outFut Outside task future.
+     * @param saved  Task has been saved to the MetaStorage.
+     */
+    public DurableBackgroundTaskState(
+        DurableBackgroundTask task,
+        @Nullable GridFutureAdapter<Void> outFut,
+        boolean saved
+    ) {
+        this.task = task;
+        this.outFut = outFut;
+        this.saved = saved;
+    }
+
+    /**
+     * Getting durable background task.
+     *
+     * @return Durable background task.
+     */
+    public DurableBackgroundTask task() {
+        return task;
+    }
+
+    /**
+     * Getting outside task future.
+     *
+     * @return Outside task future.
+     */
+    @Nullable public GridFutureAdapter<Void> outFuture() {
+        return outFut;
+    }
+
+    /**
+     * Check if the task has been saved to the MetaStorage.
+     *
+     * @return {@code True} if stored in the MetaStorage.
+     */
+    public boolean saved() {
+        return saved;
+    }
+
+    /**
+     * Getting current state of the task.
+     *
+     * @return Current state of the task.
+     */
+    public State state() {
+        return state;
+    }
+
+    /**
+     * Set the current state of the task.
+     *
+     * @param s New current state of the task.
+     */
+    public void state(State s) {
+        state = s;
+    }
+
+    /**
+     * Atomically sets of the current task state.
+     *
+     * @param exp Expected state.
+     * @param newState New state.
+     * @return {@code True} if successful.
+     */
+    public boolean state(State exp, State newState) {
+        return STATE_UPDATER.compareAndSet(this, exp, newState);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DurableBackgroundTaskState.class, this);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
index dfbab72..9fb998f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
@@ -16,303 +16,103 @@
  */
 package org.apache.ignite.internal.processors.localtask;
 
-import java.util.Map;
-import java.util.Set;
+import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
-import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
 import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
 import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
 import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
 
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
 
 /**
- * Processor that is responsible for durable background tasks that are executed on local node
- * and should be continued even after node restart.
+ * Processor that is responsible for durable background tasks that are executed on local node.
  */
 public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener,
     CheckpointListener {
     /** Prefix for metastorage keys for durable background tasks. */
-    private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-";
+    private static final String TASK_PREFIX = "durable-background-task-";
 
-    /** Metastorage. */
-    private volatile ReadWriteMetastorage metastorage;
-
-    /** Metastorage synchronization mutex. */
+    /** MetaStorage synchronization mutex. */
     private final Object metaStorageMux = new Object();
 
-    /** Set of workers that executing durable background tasks. */
-    private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new GridConcurrentHashSet<>();
+    /** Current tasks. Mapping: {@link DurableBackgroundTask#name task name} -> task state. */
+    private final ConcurrentMap<String, DurableBackgroundTaskState> tasks = new ConcurrentHashMap<>();
 
-    /** Count of workers that executing durable background tasks. */
-    private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new AtomicInteger(0);
-
-    /** Durable background tasks map. */
-    private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>();
-
-    /** Set of started tasks' names. */
-    private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+    /** Lock for canceling tasks. */
+    private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
 
     /**
-     * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks.
-     *
-     *  @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+     * Tasks to be removed from the MetaStorage after the end of a checkpoint.
+     * Mapping: {@link DurableBackgroundTask#name task name} -> task.
      */
-    private volatile boolean forbidStartingNewTasks;
+    private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new ConcurrentHashMap<>();
+
+    /** Prohibiting the execution of tasks. */
+    private volatile boolean prohibitionExecTasks = true;
 
     /**
+     * Constructor.
+     *
      * @param ctx Kernal context.
      */
     public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
         super(ctx);
     }
 
-    /**
-     * Starts the asynchronous operation of pending tasks execution. Is called on start.
-     */
-    private void asyncDurableBackgroundTasksExecution() {
-        assert durableBackgroundTasks != null;
-
-        for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
-            if (!task.isCompleted() && startedTasks.add(task.shortName()))
-                asyncDurableBackgroundTaskExecute(task);
-        }
-    }
-
-    /**
-     * Creates a worker to execute single durable background task.
-     *
-     * @param task Task.
-     */
-    private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) {
-        String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
-
-        GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) {
-            @Override public void cancel() {
-                task.onCancel();
-
-                super.cancel();
-            }
-
-            @Override protected void body() {
-                try {
-                    if (forbidStartingNewTasks)
-                        return;
-
-                    log.info("Executing durable background task: " + task.shortName());
-
-                    task.execute(ctx);
-
-                    task.complete();
-
-                    log.info("Execution of durable background task completed: " + task.shortName());
-                }
-                catch (Throwable e) {
-                    log.error("Could not execute durable background task: " + task.shortName(), e);
-                }
-                finally {
-                    startedTasks.remove(task.shortName());
-
-                    asyncDurableBackgroundTaskWorkers.remove(this);
-                }
-            }
-        };
-
-        asyncDurableBackgroundTaskWorkers.add(worker);
-
-        Thread asyncTask = new IgniteThread(worker);
-
-        asyncTask.start();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart(boolean active) {
-        asyncDurableBackgroundTasksExecution();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        forbidStartingNewTasks = true;
-
-        awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
-    }
-
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
     }
 
-    /**
-     * @param msg Message.
-     */
-    public void onStateChange(ChangeGlobalStateMessage msg) {
-        if (msg.state() == ClusterState.INACTIVE) {
-            forbidStartingNewTasks = true;
-
-            awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
-        }
-    }
-
-    /**
-     * @param msg Message.
-     */
-    public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
-        if (msg.state() != ClusterState.INACTIVE) {
-            forbidStartingNewTasks = false;
-
-            asyncDurableBackgroundTasksExecution();
-        }
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        cancelTasks();
     }
 
     /** {@inheritDoc} */
     @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
-        synchronized (metaStorageMux) {
-            if (durableBackgroundTasks.isEmpty()) {
-                try {
-                    metastorage.iterate(
-                        STORE_DURABLE_BACKGROUND_TASK_PREFIX,
-                        (key, val) -> durableBackgroundTasks.put(key, (DurableBackgroundTask)val),
-                        true
-                    );
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException("Failed to iterate durable background tasks storage.", e);
-                }
-            }
-        }
+        metaStorageOperation(metaStorage -> {
+            assert metaStorage != null;
+
+            metaStorage.iterate(
+                TASK_PREFIX,
+                (k, v) -> {
+                    DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+                    tasks.put(t.name(), new DurableBackgroundTaskState(t, null, true));
+                },
+                true
+            );
+        });
     }
 
     /** {@inheritDoc} */
     @Override public void onReadyForReadWrite(ReadWriteMetastorage metastorage) {
-        synchronized (metaStorageMux) {
-            try {
-                for (Map.Entry<String, DurableBackgroundTask> entry : durableBackgroundTasks.entrySet()) {
-                    if (metastorage.readRaw(entry.getKey()) == null)
-                        metastorage.write(entry.getKey(), entry.getValue());
-                }
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException("Failed to read key from durable background tasks storage.", e);
-            }
-        }
-
         ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
-
-        this.metastorage = metastorage;
-    }
-
-    /**
-     * Builds a metastorage key for durable background task object.
-     *
-     * @param obj Object.
-     * @return Metastorage key.
-     */
-    private String durableBackgroundTaskMetastorageKey(DurableBackgroundTask obj) {
-        String k = STORE_DURABLE_BACKGROUND_TASK_PREFIX + obj.shortName();
-
-        if (k.length() > MetastorageTree.MAX_KEY_LEN) {
-            int hashLenLimit = 5;
-
-            String hash = String.valueOf(k.hashCode());
-
-            k = k.substring(0, MetastorageTree.MAX_KEY_LEN - hashLenLimit) +
-                (hash.length() > hashLenLimit ? hash.substring(0, hashLenLimit) : hash);
-        }
-
-        return k;
-    }
-
-    /**
-     * Adds durable background task object.
-     *
-     * @param obj Object.
-     */
-    private void addDurableBackgroundTask(DurableBackgroundTask obj) {
-        String objName = durableBackgroundTaskMetastorageKey(obj);
-
-        synchronized (metaStorageMux) {
-            durableBackgroundTasks.put(objName, obj);
-
-            if (metastorage != null) {
-                ctx.cache().context().database().checkpointReadLock();
-
-                try {
-                    metastorage.write(objName, obj);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
-                finally {
-                    ctx.cache().context().database().checkpointReadUnlock();
-                }
-            }
-        }
-    }
-
-    /**
-     * Removes durable background task object.
-     *
-     * @param obj Object.
-     */
-    private void removeDurableBackgroundTask(DurableBackgroundTask obj) {
-        String objName = durableBackgroundTaskMetastorageKey(obj);
-
-        synchronized (metaStorageMux) {
-            durableBackgroundTasks.remove(objName);
-
-            if (metastorage != null) {
-                ctx.cache().context().database().checkpointReadLock();
-
-                try {
-                    metastorage.remove(objName);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
-                finally {
-                    ctx.cache().context().database().checkpointReadUnlock();
-                }
-            }
-        }
-    }
-
-    /**
-     * Starts durable background task. If task is applied to persistent cache, saves it to metastorage.
-     *
-     * @param task Continuous task.
-     * @param ccfg Cache configuration.
-     */
-    public void startDurableBackgroundTask(DurableBackgroundTask task, CacheConfiguration ccfg) {
-        if (CU.isPersistentCache(ccfg, ctx.config().getDataStorageConfiguration()))
-            addDurableBackgroundTask(task);
-
-        asyncDurableBackgroundTaskExecute(task);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onMarkCheckpointBegin(Context ctx) {
-        /* No op. */
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onCheckpointBegin(Context ctx) {
-        /* No op. */
     }
 
     /** {@inheritDoc} */
@@ -321,10 +121,236 @@
     }
 
     /** {@inheritDoc} */
-    @Override public void afterCheckpointEnd(Context ctx) {
-        for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
-            if (task.isCompleted())
-                removeDurableBackgroundTask(task);
+    @Override public void onMarkCheckpointBegin(Context ctx) {
+        for (Iterator<DurableBackgroundTaskState> it = tasks.values().iterator(); it.hasNext(); ) {
+            DurableBackgroundTaskState taskState = it.next();
+
+            if (taskState.state() == COMPLETED) {
+                assert taskState.saved();
+
+                DurableBackgroundTask t = taskState.task();
+
+                toRmv.put(t.name(), t);
+
+                it.remove();
+            }
         }
     }
+
+    /** {@inheritDoc} */
+    @Override public void onCheckpointBegin(Context ctx) {
+        /* No op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public void afterCheckpointEnd(Context ctx) {
+        for (Iterator<DurableBackgroundTask> it = toRmv.values().iterator(); it.hasNext(); ) {
+            DurableBackgroundTask t = it.next();
+
+            metaStorageOperation(metaStorage -> {
+                if (metaStorage != null && toRmv.containsKey(t.name()))
+                    metaStorage.remove(metaStorageKey(t));
+            });
+
+            it.remove();
+        }
+    }
+
+    /**
+     * Callback at the start of a global state change.
+     *
+     * @param msg Message for change cluster global state.
+     */
+    public void onStateChangeStarted(ChangeGlobalStateMessage msg) {
+        if (msg.state() == ClusterState.INACTIVE)
+            cancelTasks();
+    }
+
+    /**
+     * Callback on finish of a global state change.
+     *
+     * @param msg Finish message for change cluster global state.
+     */
+    public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
+        if (msg.state() != ClusterState.INACTIVE) {
+            prohibitionExecTasks = false;
+
+            for (DurableBackgroundTaskState taskState : tasks.values()) {
+                if (!prohibitionExecTasks)
+                    executeAsync0(taskState.task());
+            }
+        }
+    }
+
+    /**
+     * Asynchronous execution of a durable background task.
+     *
+     * A new task will be added for execution either if there is no task with
+     * the same {@link DurableBackgroundTask#name name} or it (previous) will be completed.
+     *
+     * If the task is required to be completed after restarting the node,
+     * then it must be saved to the MetaStorage.
+     *
+     * If the task is saved to the Metastorage, then it will be deleted from it
+     * only after its completion and at the end of the checkpoint. Otherwise, it
+     * will be removed as soon as it is completed.
+     *
+     * @param task Durable background task.
+     * @param save Save task to MetaStorage.
+     * @return Futures that will complete when the task is completed.
+     */
+    public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask task, boolean save) {
+        DurableBackgroundTaskState taskState = tasks.compute(task.name(), (taskName, prev) -> {
+            if (prev != null && prev.state() != COMPLETED)
+                throw new IllegalArgumentException("Task is already present and has not been completed: " + taskName);
+
+            if (save)
+                toRmv.remove(taskName);
+
+            return new DurableBackgroundTaskState(task, new GridFutureAdapter<>(), save);
+        });
+
+        if (save) {
+            metaStorageOperation(metaStorage -> {
+                if (metaStorage != null)
+                    metaStorage.write(metaStorageKey(task), task);
+            });
+        }
+
+        if (!prohibitionExecTasks)
+            executeAsync0(task);
+
+        return taskState.outFuture();
+    }
+
+    /**
+     * Overloading the {@link #executeAsync(DurableBackgroundTask, boolean)}.
+     * If task is applied to persistent cache, saves it to MetaStorage.
+     *
+     * @param t Durable background task.
+     * @param cacheCfg Cache configuration.
+     * @return Futures that will complete when the task is completed.
+     */
+    public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask t, CacheConfiguration cacheCfg) {
+        return executeAsync(t, CU.isPersistentCache(cacheCfg, ctx.config().getDataStorageConfiguration()));
+    }
+
+    /**
+     * Asynchronous execution of a durable background task.
+     *
+     * @param t Durable background task.
+     */
+    private void executeAsync0(DurableBackgroundTask t) {
+        cancelLock.readLock().lock();
+
+        try {
+            DurableBackgroundTaskState taskState = tasks.get(t.name());
+
+            if (taskState != null && taskState.state(INIT, PREPARE)) {
+                if (log.isInfoEnabled())
+                    log.info("Executing durable background task: " + t.name());
+
+                t.executeAsync(ctx).listen(f -> {
+                    DurableBackgroundTaskResult res = null;
+
+                    try {
+                        res = f.get();
+                    }
+                    catch (Throwable e) {
+                        log.error("Task completed with an error: " + t.name(), e);
+                    }
+
+                    assert res != null;
+
+                    if (res.error() != null)
+                        log.error("Could not execute durable background task: " + t.name(), res.error());
+
+                    if (res.completed()) {
+                        if (res.error() == null && log.isInfoEnabled())
+                            log.info("Execution of durable background task completed: " + t.name());
+
+                        if (taskState.saved())
+                            taskState.state(COMPLETED);
+                        else
+                            tasks.remove(t.name());
+
+                        GridFutureAdapter<Void> outFut = taskState.outFuture();
+
+                        if (outFut != null)
+                            outFut.onDone(res.error());
+                    }
+                    else {
+                        assert res.restart();
+
+                        if (log.isInfoEnabled())
+                            log.info("Execution of durable background task will be restarted: " + t.name());
+
+                        taskState.state(INIT);
+                    }
+                });
+
+                taskState.state(PREPARE, STARTED);
+            }
+        }
+        finally {
+            cancelLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Canceling tasks that are currently being executed.
+     * Prohibiting the execution of tasks.
+     */
+    private void cancelTasks() {
+        prohibitionExecTasks = true;
+
+        cancelLock.writeLock().lock();
+
+        try {
+            for (DurableBackgroundTaskState taskState : tasks.values()) {
+                if (taskState.state() == STARTED)
+                    taskState.task().cancel();
+            }
+        }
+        finally {
+            cancelLock.writeLock().unlock();
+        }
+    }
+
+    /**
+     * Performing an operation on a {@link MetaStorage}.
+     * Guarded by {@link #metaStorageMux}.
+     *
+     * @param consumer MetaStorage operation, argument can be {@code null}.
+     * @throws IgniteException If an exception is thrown from the {@code consumer}.
+     */
+    private void metaStorageOperation(IgniteThrowableConsumer<MetaStorage> consumer) throws IgniteException {
+        synchronized (metaStorageMux) {
+            IgniteCacheDatabaseSharedManager dbMgr = ctx.cache().context().database();
+
+            dbMgr.checkpointReadLock();
+
+            try {
+                MetaStorage metaStorage = dbMgr.metaStorage();
+
+                consumer.accept(metaStorage);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+            finally {
+                dbMgr.checkpointReadUnlock();
+            }
+        }
+    }
+
+    /**
+     * Getting the task key for the MetaStorage.
+     *
+     * @param t Durable background task.
+     * @return MetaStorage {@code t} key.
+     */
+    static String metaStorageKey(DurableBackgroundTask t) {
+        return TASK_PREFIX + t.name();
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index d72dc8f..de79bf8 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -11669,7 +11669,11 @@
      * @param cancel Wheter should cancel workers.
      * @param log Logger.
      */
-    public static void awaitForWorkersStop(Collection<GridWorker> workers, boolean cancel, IgniteLogger log) {
+    public static void awaitForWorkersStop(
+        Collection<GridWorker> workers,
+        boolean cancel,
+        @Nullable IgniteLogger log
+    ) {
         for (GridWorker worker : workers) {
             try {
                 if (cancel)
@@ -11678,7 +11682,8 @@
                 worker.join();
             }
             catch (Exception e) {
-                log.warning(String.format("Failed to cancel grid runnable [%s]: %s", worker.toString(), e.getMessage()));
+                if (log != null)
+                    log.warning("Failed to cancel grid runnable [" + worker.toString() + "]: " + e.getMessage());
             }
         }
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
index eea82cf..9a42ff1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java
@@ -440,16 +440,6 @@
     }
 
     /**
-     * Getting db manager of node.
-     *
-     * @param n Node.
-     * @return Db manager.
-     */
-    private GridCacheDatabaseSharedManager dbMgr(IgniteEx n) {
-        return (GridCacheDatabaseSharedManager)n.context().cache().context().database();
-    }
-
-    /**
      * Getting DATASTORAGE_METRIC_PREFIX metric registry.
      *
      * @param n Node.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java
index a060eed..c6dcf90 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java
@@ -28,7 +28,6 @@
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -271,14 +270,4 @@
                 ", segNum=" + segments + ", currSeg=" + walMgr(n).currentSegment() + ']');
         }
     }
-
-    /**
-     * Getting db manager of node.
-     *
-     * @param n Node.
-     * @return Db manager.
-     */
-    private GridCacheDatabaseSharedManager dbMgr(IgniteEx n) {
-        return (GridCacheDatabaseSharedManager)n.context().cache().context().database();
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
new file mode 100644
index 0000000..ba0da72
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessorSelfTest.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.localtask;
+
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableFunction;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.cluster.ClusterState.INACTIVE;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult.complete;
+import static org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult.restart;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
+import static org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessor.metaStorageKey;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+
+/**
+ * Class for testing the {@link DurableBackgroundTasksProcessor}.
+ */
+public class DurableBackgroundTasksProcessorSelfTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setFailureHandler(new StopNodeFailureHandler())
+            .setDataStorageConfiguration(
+                new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
+            );
+    }
+
+    /**
+     * Checking the correctness of work (without saving to the MetaStorage) of the task in normal mode.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSimpleTaskExecutionWithoutMetaStorage() throws Exception {
+        checkSimpleTaskExecute(false);
+    }
+
+    /**
+     * Checking the correctness of work (with saving to the MetaStorage) of the task in normal mode.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSimpleTaskExecutionWithMetaStorage() throws Exception {
+        checkSimpleTaskExecute(true);
+    }
+
+    /**
+     * Checking the correctness of restarting the task without MetaStorage.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRestartTaskExecutionWithoutMetaStorage() throws Exception {
+        checkRestartTaskExecute(false);
+    }
+
+    /**
+     * Checking the correctness of restarting the task with MetaStorage.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRestartTaskExecutionWithMetaStorage() throws Exception {
+        checkRestartTaskExecute(true);
+    }
+
+    /**
+     * Checking the correctness of cancelling the task.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testCancelTaskExecution() throws Exception {
+        IgniteEx n = startGrid(0);
+        n.cluster().state(ACTIVE);
+
+        SimpleTask t = new SimpleTask("t");
+        IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, false);
+
+        t.onExecFut.get(getTestTimeout());
+        checkStateAndMetaStorage(n, t, STARTED, false);
+        assertFalse(execAsyncFut.isDone());
+
+        n.cluster().state(INACTIVE);
+
+        t.onExecFut.get(getTestTimeout());
+        t.taskFut.onDone(complete(null));
+
+        execAsyncFut.get(getTestTimeout());
+    }
+
+    /**
+     * Check that the task will be restarted after restarting the node.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testRestartTaskAfterRestartNode() throws Exception {
+        IgniteEx n = startGrid(0);
+        n.cluster().state(ACTIVE);
+
+        SimpleTask t = new SimpleTask("t");
+        execAsync(n, t, true);
+
+        t.onExecFut.get(getTestTimeout());
+        checkStateAndMetaStorage(n, t, STARTED, true);
+        t.taskFut.onDone(restart(null));
+
+        stopAllGrids();
+
+        n = startGrid(0);
+        n.cluster().state(ACTIVE);
+
+        t = ((SimpleTask)tasks(n).get(t.name()).task());
+
+        t.onExecFut.get(getTestTimeout());
+        checkStateAndMetaStorage(n, t, STARTED, true);
+        t.taskFut.onDone(complete(null));
+    }
+
+    /**
+     * Check that the task will be restarted correctly.
+     *
+     * @param save Save to MetaStorage.
+     * @throws Exception If failed.
+     */
+    private void checkRestartTaskExecute(boolean save) throws Exception {
+        IgniteEx n = startGrid(0);
+        n.cluster().state(ACTIVE);
+
+        SimpleTask t = new SimpleTask("t");
+        IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, save);
+
+        t.onExecFut.get(getTestTimeout());
+        checkStateAndMetaStorage(n, t, STARTED, save);
+        assertFalse(execAsyncFut.isDone());
+
+        if (save) {
+            ObservingCheckpointListener checkpointLsnr = new ObservingCheckpointListener();
+            dbMgr(n).addCheckpointListener(checkpointLsnr);
+
+            dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+            t.taskFut.onDone(restart(null));
+            checkStateAndMetaStorage(n, t, INIT, true);
+            assertFalse(execAsyncFut.isDone());
+
+            GridFutureAdapter<Void> onMarkCheckpointBeginFut = checkpointLsnr.onMarkCheckpointBeginAsync(ctx -> {
+                checkStateAndMetaStorage(n, t, INIT, true);
+                assertFalse(toRmv(n).containsKey(t.name()));
+            });
+
+            dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
+            onMarkCheckpointBeginFut.get(getTestTimeout());
+        }
+        else {
+            t.taskFut.onDone(restart(null));
+            checkStateAndMetaStorage(n, t, INIT, false);
+            assertFalse(execAsyncFut.isDone());
+        }
+
+        t.reset();
+
+        n.cluster().state(INACTIVE);
+        n.cluster().state(ACTIVE);
+
+        t.onExecFut.get(getTestTimeout());
+        checkStateAndMetaStorage(n, t, STARTED, save);
+        assertFalse(execAsyncFut.isDone());
+
+        t.taskFut.onDone(complete(null));
+        execAsyncFut.get(getTestTimeout());
+    }
+
+    /**
+     * Checking that the task will be processed correctly in the normal mode.
+     *
+     * @param save Save to MetaStorage.
+     * @throws Exception If failed.
+     */
+    private void checkSimpleTaskExecute(boolean save) throws Exception {
+        IgniteEx n = startGrid(0);
+
+        SimpleTask t = new SimpleTask("t");
+        IgniteInternalFuture<Void> execAsyncFut = execAsync(n, t, save);
+
+        checkStateAndMetaStorage(n, t, INIT, save);
+
+        checkExecuteSameTask(n, t);
+        checkStateAndMetaStorage(n, t, INIT, save);
+
+        assertFalse(t.onExecFut.isDone());
+        assertFalse(execAsyncFut.isDone());
+
+        n.cluster().state(ACTIVE);
+
+        t.onExecFut.get(getTestTimeout());
+
+        checkStateAndMetaStorage(n, t, STARTED, save);
+        assertFalse(execAsyncFut.isDone());
+
+        if (save) {
+            dbMgr(n).enableCheckpoints(false).get(getTestTimeout());
+
+            t.taskFut.onDone(complete(null));
+            execAsyncFut.get(getTestTimeout());
+
+            checkStateAndMetaStorage(n, t, COMPLETED, true);
+
+            ObservingCheckpointListener checkpointLsnr = new ObservingCheckpointListener();
+
+            GridFutureAdapter<Void> onMarkCheckpointBeginFut = checkpointLsnr.onMarkCheckpointBeginAsync(
+                ctx -> {
+                    checkStateAndMetaStorage(n, t, null, true);
+                    assertTrue(toRmv(n).containsKey(t.name()));
+                }
+            );
+
+            GridFutureAdapter<Void> afterCheckpointEndFut = checkpointLsnr.afterCheckpointEndAsync(
+                ctx -> {
+                    checkStateAndMetaStorage(n, t, null, false);
+                    assertFalse(toRmv(n).containsKey(t.name()));
+                }
+            );
+
+            dbMgr(n).addCheckpointListener(checkpointLsnr);
+            dbMgr(n).enableCheckpoints(true).get(getTestTimeout());
+
+            onMarkCheckpointBeginFut.get(getTestTimeout());
+            afterCheckpointEndFut.get(getTestTimeout());
+        }
+        else {
+            t.taskFut.onDone(complete(null));
+            execAsyncFut.get(getTestTimeout());
+
+            checkStateAndMetaStorage(n, t, null, false);
+        }
+    }
+
+    /**
+     * Checking that until the task is completed it is impossible to add a
+     * task with the same {@link DurableBackgroundTask#name name}.
+     *
+     * @param n Node.
+     * @param t Task.
+     */
+    private void checkExecuteSameTask(IgniteEx n, DurableBackgroundTask t) {
+        assertThrows(log, () -> execAsync(n, t, false), IllegalArgumentException.class, null);
+        assertThrows(log, () -> execAsync(n, t, true), IllegalArgumentException.class, null);
+        assertThrows(log, () -> execAsync(n, new SimpleTask(t.name()), false), IllegalArgumentException.class, null);
+        assertThrows(log, () -> execAsync(n, new SimpleTask(t.name()), true), IllegalArgumentException.class, null);
+    }
+
+    /**
+     * Checking the internal {@link DurableBackgroundTaskState state} of the task and storage in the MetaStorage.
+     *
+     * @param n Node.
+     * @param t Task.
+     * @param expState Expected state of the task, {@code null} means that the task should not be.
+     * @param expSaved Task is expected to be stored in MetaStorage.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void checkStateAndMetaStorage(
+        IgniteEx n,
+        DurableBackgroundTask t,
+        @Nullable State expState,
+        boolean expSaved
+    ) throws IgniteCheckedException {
+        DurableBackgroundTaskState taskState = tasks(n).get(t.name());
+
+        if (expState == null)
+            assertNull(taskState);
+        else
+            assertEquals(expState, taskState.state());
+
+        DurableBackgroundTask ser = (DurableBackgroundTask)metaStorageOperation(n, ms -> ms.read(metaStorageKey(t)));
+
+        if (expSaved)
+            assertEquals(t.name(), ser.name());
+        else
+            assertNull(ser);
+    }
+
+    /**
+     * Asynchronous execution of a durable background task.
+     *
+     * @param n Node.
+     * @param t Task.
+     * @param save Save task to MetaStorage.
+     * @return Task future.
+     */
+    private IgniteInternalFuture<Void> execAsync(IgniteEx n, DurableBackgroundTask t, boolean save) {
+        return durableBackgroundTasksProcessor(n).executeAsync(t, save);
+    }
+
+    /**
+     * Getting {@code DurableBackgroundTasksProcessor#toRmv}.
+     *
+     * @param n Node.
+     * @return Map of tasks that will be removed from the MetaStorage.
+     */
+    private Map<String, DurableBackgroundTask> toRmv(IgniteEx n) {
+        return getFieldValue(durableBackgroundTasksProcessor(n), "toRmv");
+    }
+
+    /**
+     * Getting {@code DurableBackgroundTasksProcessor#tasks}.
+     *
+     * @param n Node.
+     * @return Task states map.
+     */
+    private Map<String, DurableBackgroundTaskState> tasks(IgniteEx n) {
+        return getFieldValue(durableBackgroundTasksProcessor(n), "tasks");
+    }
+
+    /**
+     * Getting durable background task processor.
+     *
+     * @param n Node.
+     * @return Durable background task processor.
+     */
+    private DurableBackgroundTasksProcessor durableBackgroundTasksProcessor(IgniteEx n) {
+        return n.context().durableBackgroundTasksProcessor();
+    }
+
+    /**
+     * Performing an operation on a MetaStorage.
+     *
+     * @param n Node.
+     * @param fun Function for working with MetaStorage, the argument can be {@code null}.
+     * @return The function result.
+     * @throws IgniteCheckedException If failed.
+     */
+    private <R> R metaStorageOperation(
+        IgniteEx n,
+        IgniteThrowableFunction<MetaStorage, R> fun
+    ) throws IgniteCheckedException {
+        GridCacheDatabaseSharedManager dbMgr = dbMgr(n);
+
+        dbMgr.checkpointReadLock();
+
+        try {
+            return fun.apply(dbMgr.metaStorage());
+        }
+        finally {
+            dbMgr.checkpointReadUnlock();
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java
new file mode 100644
index 0000000..b86ed09
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/ObservingCheckpointListener.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.localtask;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
+
+/**
+ * Observer is a checkpoint listener.
+ * Consumers will be reset after a single use.
+ */
+public class ObservingCheckpointListener implements CheckpointListener {
+    /** On mark checkpoint begin consumer. */
+    volatile IgniteThrowableConsumer<Context> onMarkCheckpointBeginConsumer;
+
+    /** After checkpoint end consumer. */
+    volatile IgniteThrowableConsumer<Context> afterCheckpointEndConsumer;
+
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
+        IgniteThrowableConsumer<Context> consumer = onMarkCheckpointBeginConsumer;
+
+        if (consumer != null) {
+            onMarkCheckpointBeginConsumer = null;
+
+            consumer.accept(ctx);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCheckpointBegin(Context ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeCheckpointBegin(Context ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void afterCheckpointEnd(Context ctx) throws IgniteCheckedException {
+        IgniteThrowableConsumer<Context> consumer = afterCheckpointEndConsumer;
+
+        if (consumer != null) {
+            afterCheckpointEndConsumer = null;
+
+            consumer.accept(ctx);
+        }
+    }
+
+    /**
+     * Asynchronous execution {@link #onMarkCheckpointBeginConsumer}.
+     *
+     * @param consumer On mark checkpoint begin consumer.
+     * @return Future that complete when the consumer complete.
+     */
+    GridFutureAdapter<Void> onMarkCheckpointBeginAsync(IgniteThrowableConsumer<Context> consumer) {
+        GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+        onMarkCheckpointBeginConsumer = asyncConsumer(consumer, fut);
+
+        return fut;
+    }
+
+    /**
+     * Asynchronous execution {@link #afterCheckpointEndConsumer}.
+     *
+     * @param consumer After checkpoint end consumer.
+     * @return Future that complete when the consumer complete.
+     */
+    GridFutureAdapter<Void> afterCheckpointEndAsync(IgniteThrowableConsumer<Context> consumer) {
+        GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
+
+        afterCheckpointEndConsumer = asyncConsumer(consumer, fut);
+
+        return fut;
+    }
+
+    /**
+     * Make the consumer asynchronous.
+     *
+     * @param consumer Consumer.
+     * @param fut Future that complete when the consumer complete.
+     * @return New consumer.
+     */
+    private static IgniteThrowableConsumer<Context> asyncConsumer(
+        IgniteThrowableConsumer<Context> consumer,
+        GridFutureAdapter<Void> fut
+    ) {
+        return ctx -> {
+            Throwable err = null;
+
+            try {
+                consumer.accept(ctx);
+            }
+            catch (Throwable t) {
+                err = t;
+            }
+
+            fut.onDone(err);
+        };
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/SimpleTask.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/SimpleTask.java
new file mode 100644
index 0000000..8d14ea4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/localtask/SimpleTask.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.localtask;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.dto.IgniteDataTransferObject;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Simple {@link DurableBackgroundTask} implementation for tests.
+ */
+class SimpleTask extends IgniteDataTransferObject implements DurableBackgroundTask {
+    /** Serial version UID. */
+    private static final long serialVersionUID = 0L;
+
+    /** Task name. */
+    private String name;
+
+    /** Future that will be completed at the beginning of the {@link #executeAsync}. */
+    final GridFutureAdapter<Void> onExecFut = new GridFutureAdapter<>();
+
+    /** Future that will be returned from the {@link #executeAsync}. */
+    final GridFutureAdapter<DurableBackgroundTaskResult> taskFut = new GridFutureAdapter<>();
+
+    /** Future that will be completed at the beginning of the {@link #cancel}. */
+    final GridFutureAdapter<Void> onCancelFut = new GridFutureAdapter<>();
+
+    /**
+     * Default constructor.
+     */
+    public SimpleTask() {
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param name Task name.
+     */
+    public SimpleTask(String name) {
+        this.name = name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return name;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void cancel() {
+        onCancelFut.onDone();
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteInternalFuture<DurableBackgroundTaskResult> executeAsync(GridKernalContext ctx) {
+        onExecFut.onDone();
+
+        return taskFut;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void writeExternalData(ObjectOutput out) throws IOException {
+        U.writeLongString(out, name);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void readExternalData(
+        byte protoVer,
+        ObjectInput in
+    ) throws IOException, ClassNotFoundException {
+        name = U.readLongString(in);
+    }
+
+    /**
+     * Resetting internal futures.
+     */
+    void reset() {
+        onExecFut.reset();
+        taskFut.reset();
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index a7f4c74..0f02435 100755
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -2714,4 +2714,14 @@
         return Optional.ofNullable(n.cachex(cacheName)).map(IgniteInternalCache::context)
             .map(GridCacheContext::cache).map(GridCacheAdapter::metrics0).orElse(null);
     }
+
+    /**
+     * Getting database manager.
+     *
+     * @param n Node.
+     * @return Database manager.
+     */
+    protected GridCacheDatabaseSharedManager dbMgr(IgniteEx n) {
+        return (GridCacheDatabaseSharedManager)n.context().cache().context().database();
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
index 73b4880..d5667fd 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java
@@ -75,6 +75,7 @@
 import org.apache.ignite.internal.processors.database.IgniteDbSingleNodePutGetTest;
 import org.apache.ignite.internal.processors.database.IgniteDbSingleNodeTinyPutGetTest;
 import org.apache.ignite.internal.processors.diagnostic.DiagnosticProcessorTest;
+import org.apache.ignite.internal.processors.localtask.DurableBackgroundTasksProcessorSelfTest;
 import org.apache.ignite.internal.processors.metastorage.DistributedMetaStoragePersistentTest;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.DynamicSuite;
@@ -212,5 +213,7 @@
         GridTestUtils.addTestIfNeeded(suite, ActiveOnStartPropertyTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, AutoActivationPropertyTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, ClusterStateOnStartPropertyTest.class, ignoredTests);
+
+        GridTestUtils.addTestIfNeeded(suite, DurableBackgroundTasksProcessorSelfTest.class, ignoredTests);
     }
 }
