IGNITE-15444 Implement MetaStorageManager component stop (#332)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
new file mode 100644
index 0000000..06a9d48
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Synchronization aid to track "busy" state of a subsystem that owns it.
+ * <p>
+ * This class is implemented
+ * over {@link IgniteSpinReadWriteLock}.
+ * <p>
+ * For example, there may be a manager that have different threads for some
+ * purposes and the manager must not be stopped while at least a single thread
+ * is in "busy" state. In this situation each thread must enter to "busy"
+ * state calling method {@link #enterBusy()} in critical pieces of code
+ * which, i.e. use grid kernal functionality, notifying that the manager
+ * and the whole grid kernal cannot be stopped while it's in progress. Once
+ * the activity is done, the thread should leave "busy" state calling method
+ * {@link #leaveBusy()}. The manager itself, when stopping, should call method
+ * {@link #block} that blocks till all activities leave "busy" state.
+ * @see IgniteSpinReadWriteLock
+ */
+public class IgniteSpinBusyLock {
+    /** Underlying read-write lock. */
+    private final IgniteSpinReadWriteLock lock = new IgniteSpinReadWriteLock();
+
+    /**
+     * Enters "busy" state.
+     *
+     * @return {@code true} if entered to busy state.
+     */
+    public boolean enterBusy() {
+        return !lock.writeLockedByCurrentThread() && lock.tryReadLock();
+    }
+
+    /**
+     * Checks if busy lock was blocked by current thread.
+     *
+     * @return {@code True} if busy lock was blocked by current thread.
+     */
+    public boolean blockedByCurrentThread() {
+        return lock.writeLockedByCurrentThread();
+    }
+
+    /**
+     * Leaves "busy" state.
+     */
+    public void leaveBusy() {
+        lock.readUnlock();
+    }
+
+    /**
+     * Blocks current thread till all activities left "busy" state
+     * and prevents them from further entering to "busy" state.
+     */
+    public void block() {
+        lock.writeLock();
+    }
+
+    /**
+     * @param millis Timeout.
+     * @return {@code True} if lock was acquired.
+     * @throws InterruptedException If interrupted.
+     */
+    public boolean tryBlock(long millis) throws InterruptedException {
+        return lock.tryWriteLock(millis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Makes possible for activities entering busy state again.
+     */
+    public void unblock() {
+        lock.writeUnlock();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
new file mode 100644
index 0000000..f1deb9f
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinReadWriteLock.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Spin read-write lock.
+ */
+public class IgniteSpinReadWriteLock {
+    /** */
+    private static final long PENDING_WLOCKS_OFFS;
+
+    /** */
+    private static final long STATE_OFFS;
+
+    /**
+     * TODO: replace UNSAFE usage using VarHandle https://issues.apache.org/jira/browse/IGNITE-15536
+     */
+    static {
+        try {
+            STATE_OFFS = GridUnsafe.objectFieldOffset(IgniteSpinReadWriteLock.class.getDeclaredField("state"));
+
+            PENDING_WLOCKS_OFFS =
+                GridUnsafe.objectFieldOffset(IgniteSpinReadWriteLock.class.getDeclaredField("pendingWLocks"));
+        }
+        catch (NoSuchFieldException e) {
+            throw new Error(e);
+        }
+    }
+
+    /** */
+    private final ThreadLocal<Integer> readLockEntryCnt = new ThreadLocal<Integer>() {
+        @Override protected Integer initialValue() {
+            return 0;
+        }
+    };
+
+    /** */
+    private volatile int state;
+
+    /** */
+    private volatile int pendingWLocks;
+
+    /** */
+    private long writeLockOwner = -1;
+
+    /** */
+    private int writeLockEntryCnt;
+
+    /**
+     * Acquires read lock.
+     */
+    @SuppressWarnings("BusyWait")
+    public void readLock() {
+        int cnt = readLockEntryCnt.get();
+
+        // Read lock reentry or acquiring read lock while holding write lock.
+        if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
+            assert state > 0 || state == -1;
+
+            readLockEntryCnt.set(cnt + 1);
+
+            return;
+        }
+
+        boolean interrupted = false;
+
+        while (true) {
+            int cur = state;
+
+            assert cur >= -1;
+
+            if (cur == -1 || pendingWLocks > 0) {
+                try {
+                    Thread.sleep(10);
+                }
+                catch (InterruptedException ignored) {
+                    interrupted = true;
+                }
+
+                continue;
+            }
+
+            if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
+                if (interrupted)
+                    Thread.currentThread().interrupt();
+
+                break;
+            }
+        }
+
+        readLockEntryCnt.set(1);
+    }
+
+    /**
+     * Tries to acquire read lock.
+     *
+     * @return {@code true} if acquired.
+     */
+    public boolean tryReadLock() {
+        int cnt = readLockEntryCnt.get();
+
+        // Read lock reentry or acquiring read lock while holding write lock.
+        if (cnt > 0 || Thread.currentThread().getId() == writeLockOwner) {
+            assert state > 0 || state == -1;
+
+            readLockEntryCnt.set(cnt + 1);
+
+            return true;
+        }
+
+        while (true) {
+            int cur = state;
+
+            if (cur == -1 || pendingWLocks > 0)
+                return false;
+
+            if (compareAndSet(STATE_OFFS, cur, cur + 1)) {
+                readLockEntryCnt.set(1);
+
+                return true;
+            }
+        }
+    }
+
+    /**
+     * Read unlock.
+     */
+    public void readUnlock() {
+        int cnt = readLockEntryCnt.get();
+
+        if (cnt == 0)
+            throw new IllegalMonitorStateException();
+
+        // Read unlock when holding write lock is performed here.
+        if (cnt > 1 || Thread.currentThread().getId() == writeLockOwner) {
+            assert state > 0 || state == -1;
+
+            readLockEntryCnt.set(cnt - 1);
+
+            return;
+        }
+
+        while (true) {
+            int cur = state;
+
+            assert cur > 0;
+
+            if (compareAndSet(STATE_OFFS, cur, cur - 1)) {
+                readLockEntryCnt.set(0);
+
+                return;
+            }
+        }
+    }
+
+    /**
+     * Acquires write lock.
+     */
+    @SuppressWarnings("BusyWait")
+    public void writeLock() {
+        long threadId = Thread.currentThread().getId();
+
+        if (threadId == writeLockOwner) {
+            assert state == -1;
+
+            writeLockEntryCnt++;
+
+            return;
+        }
+
+        // Increment pending write locks.
+        while (true) {
+            int pendingWLocks0 = pendingWLocks;
+
+            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+                break;
+        }
+
+        boolean interrupted = false;
+
+        while (!compareAndSet(STATE_OFFS, 0, -1)) {
+            try {
+                Thread.sleep(10);
+            }
+            catch (InterruptedException ignored) {
+                interrupted = true;
+            }
+        }
+
+        // Decrement pending write locks.
+        while (true) {
+            int pendingWLocks0 = pendingWLocks;
+
+            assert pendingWLocks0 > 0;
+
+            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+                break;
+        }
+
+        if (interrupted)
+            Thread.currentThread().interrupt();
+
+        assert writeLockOwner == -1;
+
+        writeLockOwner = threadId;
+        writeLockEntryCnt = 1;
+    }
+
+    /**
+     * Acquires write lock without sleeping between unsuccessful attempts.
+     */
+    public void writeLock0() {
+        long threadId = Thread.currentThread().getId();
+
+        if (threadId == writeLockOwner) {
+            assert state == -1;
+
+            writeLockEntryCnt++;
+
+            return;
+        }
+
+        // Increment pending write locks.
+        while (true) {
+            int pendingWLocks0 = pendingWLocks;
+
+            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+                break;
+        }
+
+        for (;;) {
+            if (compareAndSet(STATE_OFFS, 0, -1))
+                break;
+        }
+
+        // Decrement pending write locks.
+        while (true) {
+            int pendingWLocks0 = pendingWLocks;
+
+            assert pendingWLocks0 > 0;
+
+            if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+                break;
+        }
+
+        assert writeLockOwner == -1;
+
+        writeLockOwner = threadId;
+        writeLockEntryCnt = 1;
+    }
+
+    /**
+     * @return {@code True} if blocked by current thread.
+     */
+    public boolean writeLockedByCurrentThread() {
+        return writeLockOwner == Thread.currentThread().getId();
+    }
+
+    /**
+     * Tries to acquire write lock.
+     *
+     * @return {@code True} if write lock has been acquired.
+     */
+    public boolean tryWriteLock() {
+        long threadId = Thread.currentThread().getId();
+
+        if (threadId == writeLockOwner) {
+            assert state == -1;
+
+            writeLockEntryCnt++;
+
+            return true;
+        }
+
+        if (compareAndSet(STATE_OFFS, 0, -1)) {
+            assert writeLockOwner == -1;
+
+            writeLockOwner = threadId;
+            writeLockEntryCnt = 1;
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * @param timeout Timeout.
+     * @param unit Unit.
+     * @return {@code True} if write lock has been acquired.
+     * @throws InterruptedException If interrupted.
+     */
+    @SuppressWarnings("BusyWait")
+    public boolean tryWriteLock(long timeout, TimeUnit unit) throws InterruptedException {
+        long threadId = Thread.currentThread().getId();
+
+        if (threadId == writeLockOwner) {
+            assert state == -1;
+
+            writeLockEntryCnt++;
+
+            return true;
+        }
+
+        try {
+            // Increment pending write locks.
+            while (true) {
+                int pendingWLocks0 = pendingWLocks;
+
+                if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 + 1))
+                    break;
+            }
+
+            long startNanos = System.nanoTime();
+
+            long timeoutNanos = unit.toNanos(timeout);
+
+            while (true) {
+                if (compareAndSet(STATE_OFFS, 0, -1)) {
+                    assert writeLockOwner == -1;
+
+                    writeLockOwner = threadId;
+                    writeLockEntryCnt = 1;
+
+                    return true;
+                }
+
+                Thread.sleep(10);
+
+                if (System.nanoTime() - startNanos >= timeoutNanos)
+                    return false;
+            }
+        }
+        finally {
+            // Decrement pending write locks.
+            while (true) {
+                int pendingWLocks0 = pendingWLocks;
+
+                assert pendingWLocks0 > 0;
+
+                if (compareAndSet(PENDING_WLOCKS_OFFS, pendingWLocks0, pendingWLocks0 - 1))
+                    break;
+            }
+        }
+    }
+
+    /**
+     * Releases write lock.
+     */
+    public void writeUnlock() {
+        long threadId = Thread.currentThread().getId();
+
+        if (threadId != writeLockOwner)
+            throw new IllegalMonitorStateException();
+
+        if (writeLockEntryCnt > 1) {
+            writeLockEntryCnt--;
+
+            return;
+        }
+
+        writeLockEntryCnt = 0;
+        writeLockOwner = -1;
+
+        // Current thread holds write and read locks and is releasing
+        // write lock now.
+        int update = readLockEntryCnt.get() > 0 ? 1 : 0;
+
+        boolean b = compareAndSet(STATE_OFFS, -1, update);
+
+        assert b;
+    }
+
+    /**
+     * @param offs Offset.
+     * @param expect Expected.
+     * @param update Update.
+     * @return {@code True} on success.
+     */
+    private boolean compareAndSet(long offs, int expect, int update) {
+        return GridUnsafe.compareAndSwapInt(this, offs, expect, update);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteSpinReadWriteLock.class, this);
+    }
+}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
index 79c2daf..aae07fc 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
@@ -76,7 +76,10 @@
                 cursorId -> metaStorageRaftGrpSvc.run(new CursorCloseCommand(cursorId))).get();
         }
         catch (InterruptedException | ExecutionException e) {
-            LOG.error("Unable to evaluate cursor close command", e);
+            if (e.getCause() != null && e.getCause().getClass().equals(NodeStoppingException.class))
+                return;
+
+            LOG.debug("Unable to evaluate cursor close command", e);
 
             throw new IgniteInternalException(e);
         }
@@ -101,10 +104,10 @@
                         cursorId -> metaStorageRaftGrpSvc.<Boolean>run(new CursorHasNextCommand(cursorId))).get();
             }
             catch (InterruptedException | ExecutionException e) {
-                if (e.getCause().getClass().equals(NodeStoppingException.class))
+                if (e.getCause() != null && e.getCause().getClass().equals(NodeStoppingException.class))
                     return false;
 
-                LOG.error("Unable to evaluate cursor hasNext command", e);
+                LOG.debug("Unable to evaluate cursor hasNext command", e);
 
                 throw new IgniteInternalException(e);
             }
@@ -122,10 +125,10 @@
                     return fn.apply(res);
             }
             catch (InterruptedException | ExecutionException e) {
-                if (e.getCause().getClass().equals(NodeStoppingException.class))
+                if (e.getCause() != null && e.getCause().getClass().equals(NodeStoppingException.class))
                     throw new NoSuchElementException();
 
-                LOG.error("Unable to evaluate cursor hasNext command", e);
+                LOG.debug("Unable to evaluate cursor hasNext command", e);
 
                 throw new IgniteInternalException(e);
             }
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
index 482489a..266f2ef 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
@@ -27,6 +27,7 @@
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
 import org.apache.ignite.internal.metastorage.common.OperationType;
 import org.apache.ignite.internal.metastorage.common.command.ConditionInfo;
 import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
@@ -400,10 +401,12 @@
             watchers.computeIfPresent(
                 watchId,
                 (k, v) -> {
-                    CompletableFuture.runAsync(v::interrupt).thenRun(() -> {
-                        try {
-                            v.stop = true;
+                    CompletableFuture.runAsync(() -> {
+                        v.stop = true;
 
+                        v.interrupt();
+                    }).thenRun(() -> {
+                        try {
                             Thread.sleep(100);
 
                             v.cursor.close();
@@ -412,6 +415,11 @@
                             throw new IgniteInternalException(e);
                         }
                         catch (Exception e) {
+                            if (e instanceof IgniteInternalException && e.getCause().getCause() instanceof RejectedExecutionException) {
+                                LOG.warn("Cursor close command was rejected because raft executor has been already stopped.");
+                                return;
+                            }
+
                             // TODO: IGNITE-14693 Implement Meta storage exception handling logic.
                             LOG.error("Unexpected exception", e);
                         }
@@ -470,7 +478,11 @@
                     catch (Throwable e) {
                         if (e instanceof NodeStoppingException || e.getCause() instanceof NodeStoppingException)
                             break;
-                        else {
+                        else if ((e instanceof InterruptedException || e.getCause() instanceof InterruptedException) && stop) {
+                            LOG.debug("Watcher has been stopped during node's stop");
+
+                            break;
+                        } else {
                             // TODO: IGNITE-14693 Implement Meta storage exception handling logic.
                             LOG.error("Unexpected exception", e);
                         }
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 8eee25e..273b9dc 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -22,6 +22,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -46,15 +47,19 @@
 import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
@@ -72,6 +77,9 @@
 // TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
 @SuppressWarnings("unused")
 public class MetaStorageManager implements IgniteComponent {
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageManager.class);
+
     /** Meta storage raft group name. */
     private static final String METASTORAGE_RAFT_GROUP_NAME = "metastorage_raft_group";
 
@@ -96,6 +104,9 @@
     /** Meta storage service. */
     private volatile CompletableFuture<MetaStorageService> metaStorageSvcFut;
 
+    /** Raft group service. */
+    private volatile CompletableFuture<RaftGroupService> raftGroupServiceFut;
+
     /**
      * Aggregator of multiple watches to deploy them as one batch.
      *
@@ -124,6 +135,9 @@
     /** Actual storage for the Metastorage. */
     private final KeyValueStorage storage;
 
+    /** Busy lock for stop synchronisation. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
     /**
      * The constructor.
      *
@@ -164,11 +178,13 @@
 
             storage.start();
 
-            this.metaStorageSvcFut = raftMgr.prepareRaftGroup(
+            raftGroupServiceFut = raftMgr.prepareRaftGroup(
                 METASTORAGE_RAFT_GROUP_NAME,
                 metaStorageMembers,
                 () -> new MetaStorageListener(storage)
-            ).thenApply(service ->
+            );
+
+            this.metaStorageSvcFut = raftGroupServiceFut.thenApply(service ->
                 new MetaStorageServiceImpl(service, clusterNetSvc.topologyService().localMember().id())
             );
 
@@ -201,7 +217,47 @@
 
     /** {@inheritDoc} */
     @Override public void stop() {
-        // TODO: IGNITE-15161 Implement component's stop.
+        busyLock.block();
+
+        Optional<IgniteUuid> watchId;
+
+        try {
+            // If deployed future is not done, that means that stop was called in the middle of
+            // IgniteImpl.start, before deployWatches, or before init phase.
+            // It is correct to check completeness of the future because the method calls are guarded by busy lock.
+            // TODO: add busy lock for init method https://issues.apache.org/jira/browse/IGNITE-14414
+            if (deployFut.isDone()) {
+                watchId = deployFut.get();
+
+                try {
+                    if (watchId.isPresent())
+                        metaStorageSvcFut.get().stopWatch(watchId.get());
+                }
+                catch (InterruptedException | ExecutionException e) {
+                    LOG.error("Failed to get meta storage service.");
+
+                    throw new IgniteInternalException(e);
+                }
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Failed to get watch.");
+
+            throw new IgniteInternalException(e);
+        }
+
+        try {
+            if (raftGroupServiceFut != null) {
+                raftGroupServiceFut.get().shutdown();
+
+                raftMgr.stopRaftGroup(METASTORAGE_RAFT_GROUP_NAME, metastorageNodes());
+            }
+        }
+        catch (InterruptedException | ExecutionException e) {
+            LOG.error("Failed to get meta storage raft group service.");
+
+            throw new IgniteInternalException(e);
+        }
+
         try {
             storage.close();
         }
@@ -213,26 +269,33 @@
     /**
      * Deploy all registered watches.
      */
-    public synchronized void deployWatches() {
-        var watch = watchAggregator.watch(
-            appliedRevision() + 1,
-            this::storeEntries
-        );
+    public synchronized void deployWatches() throws NodeStoppingException {
+        if (!busyLock.enterBusy())
+            throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
 
-        if (watch.isEmpty())
-            deployFut.complete(Optional.empty());
-        else {
-            CompletableFuture<Void> fut =
-                dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id -> deployFut.complete(Optional.of(id)));
+        try {
+            var watch = watchAggregator.watch(
+                appliedRevision() + 1,
+                this::storeEntries
+            );
 
-            if (metaStorageNodesOnStart)
-                fut.join();
+            if (watch.isEmpty())
+                deployFut.complete(Optional.empty());
             else {
-                // TODO: need to wait for this future in init phase https://issues.apache.org/jira/browse/IGNITE-14414
-            }
-        }
+                CompletableFuture<Void> fut =
+                    dispatchAppropriateMetaStorageWatch(watch.get()).thenAccept(id -> deployFut.complete(Optional.of(id)));
 
-        deployed = true;
+                if (metaStorageNodesOnStart)
+                    fut.join();
+                else {
+                    // TODO: need to wait for this future in init phase https://issues.apache.org/jira/browse/IGNITE-14414
+                }
+            }
+
+            deployed = true;
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -247,7 +310,14 @@
         @Nullable ByteArray key,
         @NotNull WatchListener lsnr
     ) {
-        return waitForReDeploy(watchAggregator.add(key, lsnr));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return waitForReDeploy(watchAggregator.add(key, lsnr));
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -262,7 +332,15 @@
         @Nullable ByteArray key,
         @NotNull WatchListener lsnr
     ) {
-        return waitForReDeploy(watchAggregator.addPrefix(key, lsnr));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return waitForReDeploy(watchAggregator.addPrefix(key, lsnr));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -277,7 +355,15 @@
         @NotNull Collection<ByteArray> keys,
         @NotNull WatchListener lsnr
     ) {
-        return waitForReDeploy(watchAggregator.add(keys, lsnr));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return waitForReDeploy(watchAggregator.add(keys, lsnr));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -293,7 +379,15 @@
         @NotNull ByteArray to,
         @NotNull WatchListener lsnr
     ) {
-        return waitForReDeploy(watchAggregator.add(from, to, lsnr));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return waitForReDeploy(watchAggregator.add(from, to, lsnr));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -303,95 +397,199 @@
      * @return future, which will be completed when unregister finished.
      */
     public synchronized CompletableFuture<Void> unregisterWatch(long id) {
-        watchAggregator.cancel(id);
-        if (deployed)
-            return updateWatches().thenAccept(v -> {});
-        else
-            return deployFut.thenAccept(uuid -> {});
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            watchAggregator.cancel(id);
+            if (deployed)
+                return updateWatches().thenAccept(v -> {});
+            else
+                return deployFut.thenAccept(uuid -> {});
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#get(ByteArray)
      */
     public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.get(key));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.get(key));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#get(ByteArray, long)
      */
     public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key, long revUpperBound) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#getAll(Set)
      */
     public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#getAll(Set, long)
      */
     public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys, revUpperBound));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys, revUpperBound));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#put(ByteArray, byte[])
      */
     public @NotNull CompletableFuture<Void> put(@NotNull ByteArray key, byte[] val) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#getAndPut(ByteArray, byte[])
      */
     public @NotNull CompletableFuture<Entry> getAndPut(@NotNull ByteArray key, byte[] val) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.getAndPut(key, val));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.getAndPut(key, val));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#putAll(Map)
      */
     public @NotNull CompletableFuture<Void> putAll(@NotNull Map<ByteArray, byte[]> vals) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#getAndPutAll(Map)
      */
     public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(@NotNull Map<ByteArray, byte[]> vals) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.getAndPutAll(vals));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.getAndPutAll(vals));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#remove(ByteArray)
      */
     public @NotNull CompletableFuture<Void> remove(@NotNull ByteArray key) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.remove(key));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.remove(key));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#getAndRemove(ByteArray)
      */
     public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull ByteArray key) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemove(key));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemove(key));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#removeAll(Set)
      */
     public @NotNull CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> keys) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#getAndRemoveAll(Set)
      */
     public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Set<ByteArray> keys) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemoveAll(keys));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemoveAll(keys));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -404,7 +602,15 @@
         @NotNull Operation success,
         @NotNull Operation failure
     ) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -415,17 +621,32 @@
             @NotNull Collection<Operation> success,
             @NotNull Collection<Operation> failure
     ) {
-        return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure));
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure));
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#range(ByteArray, ByteArray, long)
      */
-    public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) {
-        return new CursorWrapper<>(
-            metaStorageSvcFut,
-            metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, revUpperBound))
-        );
+    public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) throws NodeStoppingException {
+        if (!busyLock.enterBusy())
+            throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+        try {
+            return new CursorWrapper<>(
+                metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, revUpperBound))
+            );
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -441,21 +662,35 @@
      * @see ByteArray
      * @see Entry
      */
-    public @NotNull Cursor<Entry> rangeWithAppliedRevision(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) {
-        return new CursorWrapper<>(
-            metaStorageSvcFut,
-            metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, appliedRevision()))
-        );
+    public @NotNull Cursor<Entry> rangeWithAppliedRevision(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) throws NodeStoppingException {
+        if (!busyLock.enterBusy())
+            throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+        try {
+            return new CursorWrapper<>(
+                metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, appliedRevision()))
+            );
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#range(ByteArray, ByteArray)
      */
-    public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) {
-        return new CursorWrapper<>(
-            metaStorageSvcFut,
-            metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo))
-        );
+    public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) throws NodeStoppingException {
+        if (!busyLock.enterBusy())
+            throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+        try {
+            return new CursorWrapper<>(
+                metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo))
+            );
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -472,13 +707,20 @@
      * @see ByteArray
      * @see Entry
      */
-    public @NotNull Cursor<Entry> prefixWithAppliedRevision(@NotNull ByteArray keyPrefix) {
-        var rangeCriterion = KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
+    public @NotNull Cursor<Entry> prefixWithAppliedRevision(@NotNull ByteArray keyPrefix) throws NodeStoppingException {
+        if (!busyLock.enterBusy())
+            throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
 
-        return new CursorWrapper<>(
-            metaStorageSvcFut,
-            metaStorageSvcFut.thenApply(svc -> svc.range(rangeCriterion.from(), rangeCriterion.to(), appliedRevision()))
-        );
+        try {
+            var rangeCriterion = KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
+
+            return new CursorWrapper<>(
+                metaStorageSvcFut.thenApply(svc -> svc.range(rangeCriterion.from(), rangeCriterion.to(), appliedRevision()))
+            );
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -492,7 +734,7 @@
      * @see ByteArray
      * @see Entry
      */
-    public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix) {
+    public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix) throws NodeStoppingException {
         return prefix(keyPrefix, -1);
     }
 
@@ -510,19 +752,34 @@
      * @see ByteArray
      * @see Entry
      */
-    public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix, long revUpperBound) {
-        var rangeCriterion = KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
-        return new CursorWrapper<>(
-            metaStorageSvcFut,
-            metaStorageSvcFut.thenApply(svc -> svc.range(rangeCriterion.from(), rangeCriterion.to(), revUpperBound))
-        );
+    public @NotNull Cursor<Entry> prefix(@NotNull ByteArray keyPrefix, long revUpperBound) throws NodeStoppingException {
+        if (!busyLock.enterBusy())
+            throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
+
+        try {
+            var rangeCriterion = KeyCriterion.RangeCriterion.fromPrefixKey(keyPrefix);
+            return new CursorWrapper<>(
+                metaStorageSvcFut.thenApply(svc -> svc.range(rangeCriterion.from(), rangeCriterion.to(), revUpperBound))
+            );
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
      * @see MetaStorageService#compact()
      */
     public @NotNull CompletableFuture<Void> compact() {
-        return metaStorageSvcFut.thenCompose(MetaStorageService::compact);
+        if (!busyLock.enterBusy())
+            return CompletableFuture.failedFuture(new NodeStoppingException("Operation has been cancelled (node is stopping)."));
+
+        try {
+            return metaStorageSvcFut.thenCompose(MetaStorageService::compact);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -635,9 +892,6 @@
     // TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches.
     /** Cursor wrapper. */
     private final class CursorWrapper<T> implements Cursor<T> {
-        /** Meta storage service future. */
-        private final CompletableFuture<MetaStorageService> metaStorageSvcFut;
-
         /** Inner cursor future. */
         private final CompletableFuture<Cursor<T>> innerCursorFut;
 
@@ -647,30 +901,35 @@
         private final InnerIterator it = new InnerIterator();
 
         /**
-         * @param metaStorageSvcFut Meta storage service future.
          * @param innerCursorFut Inner cursor future.
          */
         CursorWrapper(
-            CompletableFuture<MetaStorageService> metaStorageSvcFut,
             CompletableFuture<Cursor<T>> innerCursorFut
         ) {
-            this.metaStorageSvcFut = metaStorageSvcFut;
             this.innerCursorFut = innerCursorFut;
             this.innerIterFut = innerCursorFut.thenApply(Iterable::iterator);
         }
 
             /** {@inheritDoc} */
         @Override public void close() throws Exception {
-            innerCursorFut.thenApply(cursor -> {
-                try {
-                    cursor.close();
+            if (!busyLock.enterBusy())
+                throw new NodeStoppingException("Operation has been cancelled (node is stopping).");
 
-                    return null;
-                }
-                catch (Exception e) {
-                    throw new IgniteInternalException(e);
-                }
-            }).get();
+            try {
+                innerCursorFut.thenApply(cursor -> {
+                    try {
+                        cursor.close();
+
+                        return null;
+                    }
+                    catch (Exception e) {
+                        throw new IgniteInternalException(e);
+                    }
+                }).get();
+            }
+            finally {
+                busyLock.leaveBusy();
+            }
         }
 
         /** {@inheritDoc} */
@@ -689,22 +948,39 @@
         }
 
         private class InnerIterator implements Iterator<T> {
+            /** {@inheritDoc} */
             @Override public boolean hasNext() {
+                if (!busyLock.enterBusy())
+                    return false;
+
                 try {
-                    return innerIterFut.thenApply(Iterator::hasNext).get();
+                    try {
+                        return innerIterFut.thenApply(Iterator::hasNext).get();
+                    }
+                    catch (InterruptedException | ExecutionException e) {
+                        throw new IgniteInternalException(e);
+                    }
                 }
-                catch (InterruptedException | ExecutionException e) {
-                    throw new IgniteInternalException(e);
+                finally {
+                    busyLock.leaveBusy();
                 }
             }
 
             /** {@inheritDoc} */
             @Override public T next() {
+                if (!busyLock.enterBusy())
+                    throw new NoSuchElementException("No such element because node is stopping.");
+
                 try {
-                    return innerIterFut.thenApply(Iterator::next).get();
+                    try {
+                        return innerIterFut.thenApply(Iterator::next).get();
+                    }
+                    catch (InterruptedException | ExecutionException e) {
+                        throw new IgniteInternalException(e);
+                    }
                 }
-                catch (InterruptedException | ExecutionException e) {
-                    throw new IgniteInternalException(e);
+                finally {
+                    busyLock.leaveBusy();
                 }
             }
         }
@@ -745,4 +1021,24 @@
         else
             throw new UnsupportedOperationException("Unsupported type of criterion");
     }
+
+    /**
+     * Return metastorage nodes.
+     *
+     * This code will be deleted after node init phase is developed.
+     * https://issues.apache.org/jira/browse/IGNITE-14414
+     */
+    private List<ClusterNode> metastorageNodes() {
+        String[] metastorageNodes = this.locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
+            .metastorageNodes().value();
+
+        Predicate<ClusterNode> metaStorageNodesContainsLocPred =
+            clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name());
+
+        List<ClusterNode> metaStorageMembers = clusterNetSvc.topologyService().allMembers().stream()
+            .filter(metaStorageNodesContainsLocPred)
+            .collect(Collectors.toList());
+
+        return metaStorageMembers;
+    }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index ba63c0c..181e7ca 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -118,7 +118,7 @@
 
         String locNodeName = clusterNetSvc.topologyService().localMember().name();
 
-        if (nodes.stream().map(ClusterNode::name).collect(Collectors.toSet()).contains(locNodeName))
+        if (nodes.stream().anyMatch(n -> locNodeName.equals(n.name())))
             raftServer.stopRaftGroup(groupId);
     }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index fa5994c..0a3dcda 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.lang.IgniteLogger;
 import org.apache.ignite.network.ClusterService;
@@ -96,8 +97,8 @@
     /** */
     private final long retryDelay;
 
-    /** */
-    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+    /** TODO: Use shared executors instead https://issues.apache.org/jira/browse/IGNITE-15136 */
+    private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Raft-Group-Service-Pool"));
 
     /**
      * Constructor.
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index f36e04a..d824a1c 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -49,6 +49,7 @@
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.schema.ColumnType;
@@ -364,7 +365,7 @@
             workDir
         );
 
-        TableImpl tbl2;
+        TableImpl tbl2 = null;
 
         try {
             tableManager.start();
@@ -393,24 +394,31 @@
 
                 tableCreatedFlag.set(createTbl);
 
-                when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
-                    AtomicBoolean firstRecord = new AtomicBoolean(createTbl);
+                try {
+                    when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
+                        AtomicBoolean firstRecord = new AtomicBoolean(createTbl);
 
-                    Cursor<Entry> cursor = mock(Cursor.class);
+                        Cursor<Entry> cursor = mock(Cursor.class);
 
-                    when(cursor.hasNext()).thenAnswer(hasNextInvocation ->
-                        firstRecord.compareAndSet(true, false));
+                        when(cursor.hasNext()).thenAnswer(hasNextInvocation ->
+                            firstRecord.compareAndSet(true, false));
 
-                    Entry mockEntry = mock(Entry.class);
+                        Entry mockEntry = mock(Entry.class);
 
-                    when(mockEntry.key()).thenReturn(new ByteArray(PUBLIC_PREFIX + "uuid." + NamedListNode.NAME));
+                        when(mockEntry.key()).thenReturn(new ByteArray(PUBLIC_PREFIX + "uuid." + NamedListNode.NAME));
 
-                    when(mockEntry.value()).thenReturn(ByteUtils.toBytes(schemaTable.canonicalName()));
+                        when(mockEntry.value()).thenReturn(ByteUtils.toBytes(schemaTable.canonicalName()));
 
-                    when(cursor.next()).thenReturn(mockEntry);
+                        when(cursor.next()).thenReturn(mockEntry);
 
-                    return cursor;
-                });
+                        return cursor;
+                    });
+                }
+                catch (NodeStoppingException e) {
+                    LOG.error("Node was stopped during table creation.", e);
+
+                    fail();
+                }
 
                 if (phaser != null)
                     phaser.arriveAndAwaitAdvance();
@@ -427,6 +435,11 @@
 
             assertEquals(tablesBeforeCreation + 1, tableManager.tables().size());
         }
+        catch (NodeStoppingException e) {
+            LOG.error("Node was stopped during table creation.", e);
+
+            fail();
+        }
         finally {
             tableManager.stop();
         }