IGNITE-17286 Added missed busy locks to get rid of resources leaking during table creation. Fixes #977

Signed-off-by: Slava Koptilin <slava.koptilin@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
index 1eb90e1..92a3d2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
@@ -578,7 +578,7 @@
 
         try {
             for (Long token : history.keySet()) {
-                if (token != lastToken && causalityToken - token >= historySize) {
+                if (!token.equals(lastToken) && causalityToken - token >= historySize) {
                     history.remove(token);
                 }
             }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 77478a8..e19d734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.util;
 
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.AtomicMoveNotSupportedException;
@@ -44,11 +46,14 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.util.worker.IgniteWorker;
+import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringBuilder;
 import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.NodeStoppingException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -787,4 +792,40 @@
             }
         }
     }
+
+    /**
+     * Method that runs the provided {@code fn} in {@code busyLock}.
+     *
+     * @param busyLock Component's busy lock
+     * @param fn Function to run
+     * @param <T> Type of returned value from {@code fn}
+     * @return Result of the provided function
+     */
+    public static <T> T inBusyLock(IgniteSpinBusyLock busyLock, Supplier<T> fn) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+        }
+        try {
+            return fn.get();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * Method that runs the provided {@code fn} in {@code busyLock}.
+     *
+     * @param busyLock Component's busy lock
+     * @param fn Runnable to run
+     */
+    public static void inBusyLock(IgniteSpinBusyLock busyLock, Runnable fn) {
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+        }
+        try {
+            fn.run();
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
 }
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 8dfe45e..760b2f7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -254,7 +254,8 @@
     /** {@inheritDoc} */
     @Override
     public void stop() throws Exception {
-        assert groups.isEmpty() : IgniteStringFormatter.format("Raft groups are still running {}", groups.keySet());
+        assert groups.isEmpty() : IgniteStringFormatter.format("Raft groups {} are still running on the node {}", groups.keySet(),
+                service.topologyService().localMember().name());
 
         rpcServer.shutdown();
 
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
index 68549f7..e43f683 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
@@ -91,7 +91,7 @@
                 })
                 .collect(toList());
 
-        String metaStorageNodeName = testNodeName(testInfo, 0);
+        String metaStorageNodeName = testNodeName(testInfo, nodes() - 1);
 
         IgnitionManager.init(metaStorageNodeName, List.of(metaStorageNodeName), "cluster");
 
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index b03c77e..16a25c5 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -20,6 +20,8 @@
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -107,22 +109,30 @@
      * @return A future.
      */
     private CompletableFuture<?> onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
-        long causalityToken = schemasCtx.storageRevision();
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
+        }
 
-        ExtendedTableConfiguration tblCfg = schemasCtx.config(ExtendedTableConfiguration.class);
+        try {
+            long causalityToken = schemasCtx.storageRevision();
 
-        UUID tblId = tblCfg.id().value();
+            ExtendedTableConfiguration tblCfg = schemasCtx.config(ExtendedTableConfiguration.class);
 
-        String tableName = tblCfg.name().value();
+            UUID tblId = tblCfg.id().value();
 
-        SchemaDescriptor schemaDescriptor = SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
+            String tableName = tblCfg.name().value();
 
-        CompletableFuture<?> createSchemaFut = createSchema(causalityToken, tblId, tableName, schemaDescriptor);
+            SchemaDescriptor schemaDescriptor = SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
 
-        registriesVv.get(causalityToken)
-                .thenRun(() -> fireEvent(SchemaEvent.CREATE, new SchemaEventParameters(causalityToken, tblId, schemaDescriptor)));
+            CompletableFuture<?> createSchemaFut = createSchema(causalityToken, tblId, tableName, schemaDescriptor);
 
-        return createSchemaFut;
+            registriesVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock,
+                    () -> fireEvent(SchemaEvent.CREATE, new SchemaEventParameters(causalityToken, tblId, schemaDescriptor))));
+
+            return createSchemaFut;
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -161,27 +171,29 @@
             String tableName,
             SchemaDescriptor schemaDescriptor
     ) {
-        return registriesVv.update(causalityToken, (registries, e) -> {
+        return registriesVv.update(causalityToken, (registries, e) -> inBusyLock(busyLock, () -> {
             if (e != null) {
                 return failedFuture(new IgniteInternalException(IgniteStringFormatter.format(
-                    "Cannot create a schema for the table [tblId={}, ver={}]", tableId, schemaDescriptor.version()), e)
+                        "Cannot create a schema for the table [tblId={}, ver={}]", tableId, schemaDescriptor.version()), e)
                 );
             }
 
-            SchemaRegistryImpl reg = registries.get(tableId);
+            Map<UUID, SchemaRegistryImpl> regs = registries;
+
+            SchemaRegistryImpl reg = regs.get(tableId);
 
             if (reg == null) {
-                registries = new HashMap<>(registries);
+                regs = new HashMap<>(registries);
 
                 SchemaRegistryImpl registry = createSchemaRegistry(tableId, tableName, schemaDescriptor);
 
-                registries.put(tableId, registry);
+                regs.put(tableId, registry);
             } else {
                 reg.onSchemaRegistered(schemaDescriptor);
             }
 
-            return completedFuture(registries);
-        });
+            return completedFuture(regs);
+        }));
     }
 
     /**
@@ -195,7 +207,7 @@
     private SchemaRegistryImpl createSchemaRegistry(UUID tableId, String tableName, SchemaDescriptor initialSchema) {
         return new SchemaRegistryImpl(ver -> {
             if (!busyLock.enterBusy()) {
-                throw new IgniteException(new NodeStoppingException());
+                throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
             }
 
             try {
@@ -205,7 +217,7 @@
             }
         }, () -> {
             if (!busyLock.enterBusy()) {
-                throw new IgniteException(new NodeStoppingException());
+                throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
             }
 
             try {
@@ -336,11 +348,12 @@
      */
     public CompletableFuture<SchemaRegistry> schemaRegistry(long causalityToken, @Nullable UUID tableId) {
         if (!busyLock.enterBusy()) {
-            throw new IgniteException(new NodeStoppingException());
+            throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());
         }
 
         try {
-            return registriesVv.get(causalityToken).thenApply(regs -> tableId == null ? null : regs.get(tableId));
+            return registriesVv.get(causalityToken)
+                    .thenApply(regs -> inBusyLock(busyLock, () -> tableId == null ? null : regs.get(tableId)));
         } finally {
             busyLock.leaveBusy();
         }
@@ -363,20 +376,18 @@
      * @param tableId Table id.
      */
     public CompletableFuture<?> dropRegistry(long causalityToken, UUID tableId) {
-        return registriesVv.update(causalityToken, (registries, e) -> {
+        return registriesVv.update(causalityToken, (registries, e) -> inBusyLock(busyLock, () -> {
             if (e != null) {
                 return failedFuture(new IgniteInternalException(
-                        IgniteStringFormatter.format("Cannot remove a schema registry for the table [tblId={}]", tableId), e
-                    )
-                );
+                        IgniteStringFormatter.format("Cannot remove a schema registry for the table [tblId={}]", tableId), e));
             }
 
-            registries = new HashMap<>(registries);
+            Map<UUID, SchemaRegistryImpl> regs = new HashMap<>(registries);
 
-            registries.remove(tableId);
+            regs.remove(tableId);
 
-            return completedFuture(registries);
-        });
+            return completedFuture(regs);
+        }));
     }
 
     /** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 1e81423..028ae0c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -162,7 +162,7 @@
                 msgSrvc
         ));
 
-        SqlSchemaManagerImpl sqlSchemaManager = new SqlSchemaManagerImpl(tableManager, schemaManager, registry);
+        SqlSchemaManagerImpl sqlSchemaManager = new SqlSchemaManagerImpl(tableManager, schemaManager, registry, busyLock);
 
         sqlSchemaManager.registerListener(prepareSvc);
 
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 6e8d939..e069893 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -19,6 +19,8 @@
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 
 import java.util.Comparator;
 import java.util.HashMap;
@@ -42,6 +44,7 @@
 import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.apache.ignite.lang.NodeStoppingException;
@@ -66,6 +69,9 @@
 
     private final Set<SchemaUpdateListener> listeners = new CopyOnWriteArraySet<>();
 
+    /** Busy lock for stop synchronisation. */
+    private final IgniteSpinBusyLock busyLock;
+
     /**
      * Constructor.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -73,12 +79,14 @@
     public SqlSchemaManagerImpl(
             TableManager tableManager,
             SchemaManager schemaManager,
-            Consumer<Function<Long, CompletableFuture<?>>> registry
+            Consumer<Function<Long, CompletableFuture<?>>> registry,
+            IgniteSpinBusyLock busyLock
     ) {
         this.tableManager = tableManager;
         this.schemaManager = schemaManager;
         schemasVv = new VersionedValue<>(registry, HashMap::new);
         tablesVv = new VersionedValue<>(registry, HashMap::new);
+        this.busyLock = busyLock;
 
         calciteSchemaVv = new VersionedValue<>(null, () -> {
             SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
@@ -87,20 +95,29 @@
         });
 
         schemasVv.whenComplete((token, stringIgniteSchemaMap, throwable) -> {
-            if (throwable != null) {
-                calciteSchemaVv.completeExceptionally(
-                        token,
-                        new IgniteInternalException("Couldn't evaluate sql schemas for causality token: " + token, throwable)
-                );
+            if (!busyLock.enterBusy()) {
+                calciteSchemaVv.completeExceptionally(token, new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
 
                 return;
             }
+            try {
+                if (throwable != null) {
+                    calciteSchemaVv.completeExceptionally(
+                            token,
+                            new IgniteInternalException("Couldn't evaluate sql schemas for causality token: " + token, throwable)
+                    );
 
-            SchemaPlus newCalciteSchema = rebuild(stringIgniteSchemaMap);
+                    return;
+                }
 
-            listeners.forEach(SchemaUpdateListener::onSchemaUpdated);
+                SchemaPlus newCalciteSchema = rebuild(stringIgniteSchemaMap);
 
-            calciteSchemaVv.complete(token, newCalciteSchema);
+                listeners.forEach(SchemaUpdateListener::onSchemaUpdated);
+
+                calciteSchemaVv.complete(token, newCalciteSchema);
+            } finally {
+                busyLock.leaveBusy();
+            }
         });
     }
 
@@ -116,27 +133,34 @@
     @Override
     @NotNull
     public IgniteTable tableById(UUID id, int ver) {
-        IgniteTable table = tablesVv.latest().get(id);
-
-        // there is a chance that someone tries to resolve table before
-        // the distributed event of that table creation has been processed
-        // by TableManager, so we need to get in sync with the TableManager
-        if (table == null || ver > table.version()) {
-            table = awaitLatestTableSchema(id);
+        if (!busyLock.enterBusy()) {
+            throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
         }
+        try {
+            IgniteTable table = tablesVv.latest().get(id);
 
-        if (table == null) {
-            throw new IgniteInternalException(
-                IgniteStringFormatter.format("Table not found [tableId={}]", id));
+            // there is a chance that someone tries to resolve table before
+            // the distributed event of that table creation has been processed
+            // by TableManager, so we need to get in sync with the TableManager
+            if (table == null || ver > table.version()) {
+                table = awaitLatestTableSchema(id);
+            }
+
+            if (table == null) {
+                throw new IgniteInternalException(
+                        IgniteStringFormatter.format("Table not found [tableId={}]", id));
+            }
+
+            if (table.version() < ver) {
+                throw new IgniteInternalException(
+                        IgniteStringFormatter.format("Table version not found [tableId={}, requiredVer={}, latestKnownVer={}]",
+                                id, ver, table.version()));
+            }
+
+            return table;
+        } finally {
+            busyLock.leaveBusy();
         }
-
-        if (table.version() < ver) {
-            throw new IgniteInternalException(
-                    IgniteStringFormatter.format("Table version not found [tableId={}, requiredVer={}, latestKnownVer={}]",
-                            id, ver, table.version()));
-        }
-
-        return table;
     }
 
     public void registerListener(SchemaUpdateListener listener) {
@@ -155,57 +179,11 @@
 
             return convert(table);
         } catch (NodeStoppingException e) {
-            throw new IgniteInternalException(e);
+            throw new IgniteInternalException(NODE_STOPPING_ERR, e);
         }
     }
 
     /**
-     * Schema creation handler.
-     *
-     * @param schemaName Schema name.
-     * @param causalityToken Causality token.
-     */
-    public synchronized void onSchemaCreated(String schemaName, long causalityToken) {
-        schemasVv.update(
-                causalityToken,
-                (schemas, e) -> {
-                    if (e != null) {
-                        return failedFuture(e);
-                    }
-
-                    Map<String, IgniteSchema> res = new HashMap<>(schemas);
-
-                    res.putIfAbsent(schemaName, new IgniteSchema(schemaName));
-
-                    return completedFuture(res);
-                }
-        );
-    }
-
-    /**
-     * Schema drop handler.
-     *
-     * @param schemaName Schema name.
-     * @param causalityToken Causality token.
-     */
-    public synchronized void onSchemaDropped(String schemaName, long causalityToken) {
-        schemasVv.update(
-                causalityToken,
-                (schemas, e) -> {
-                    if (e != null) {
-                        return failedFuture(e);
-                    }
-
-                    Map<String, IgniteSchema> res = new HashMap<>(schemas);
-
-                    res.remove(schemaName);
-
-                    return completedFuture(res);
-                }
-        );
-    }
-
-    /**
      * OnSqlTypeCreated.
      * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
      */
@@ -214,50 +192,45 @@
             TableImpl table,
             long causalityToken
     ) {
-        schemasVv.update(
-                causalityToken,
-                (schemas, e) -> {
-                    if (e != null) {
-                        return failedFuture(e);
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
+        }
+        try {
+            schemasVv.update(causalityToken, (schemas, e) -> inBusyLock(busyLock, () -> {
+                if (e != null) {
+                    return failedFuture(e);
+                }
+
+                Map<String, IgniteSchema> res = new HashMap<>(schemas);
+
+                IgniteSchema schema = res.computeIfAbsent(schemaName, IgniteSchema::new);
+
+                CompletableFuture<IgniteTableImpl> igniteTableFuture = convert(causalityToken, table);
+
+                return tablesVv.update(causalityToken, (tables, ex) -> inBusyLock(busyLock, () -> {
+                    if (ex != null) {
+                        return failedFuture(ex);
                     }
 
-                    Map<String, IgniteSchema> res = new HashMap<>(schemas);
+                    Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
 
-                    IgniteSchema schema = res.computeIfAbsent(schemaName, IgniteSchema::new);
+                    return igniteTableFuture.thenApply(igniteTable -> inBusyLock(busyLock, () -> {
+                        resTbls.put(igniteTable.id(), igniteTable);
 
-                    CompletableFuture<IgniteTableImpl> igniteTableFuture = convert(causalityToken, table);
+                        return resTbls;
+                    }));
+                })).thenCombine(igniteTableFuture, (v, igniteTable) -> inBusyLock(busyLock, () -> {
+                    schema.addTable(removeSchema(schemaName, table.name()), igniteTable);
 
-                    return tablesVv
-                            .update(
-                                    causalityToken,
-                                    (tables, ex) -> {
-                                        if (ex != null) {
-                                            return failedFuture(ex);
-                                        }
+                    return null;
+                })).thenCompose(v -> inBusyLock(busyLock, () -> completedFuture(res)));
 
-                                        Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
+            }));
 
-                                        return igniteTableFuture
-                                            .thenApply(igniteTable -> {
-                                                resTbls.put(igniteTable.id(), igniteTable);
-
-                                                return resTbls;
-                                            });
-                                    }
-                            )
-                            .thenCombine(
-                                igniteTableFuture,
-                                (v, igniteTable) -> {
-                                    schema.addTable(removeSchema(schemaName, table.name()), igniteTable);
-
-                                    return null;
-                                }
-                            )
-                            .thenCompose(v -> completedFuture(res));
-                }
-        );
-
-        return calciteSchemaVv.get(causalityToken);
+            return calciteSchemaVv.get(causalityToken);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -281,45 +254,46 @@
             String tableName,
             long causalityToken
     ) {
-        schemasVv.update(causalityToken,
-                (schemas, e) -> {
-                    if (e != null) {
-                        return failedFuture(e);
-                    }
-
-                    Map<String, IgniteSchema> res = new HashMap<>(schemas);
-
-                    IgniteSchema schema = res.computeIfAbsent(schemaName, IgniteSchema::new);
-
-                    String calciteTableName = removeSchema(schemaName, tableName);
-
-                    InternalIgniteTable table = (InternalIgniteTable) schema.getTable(calciteTableName);
-
-                    if (table != null) {
-                        schema.removeTable(calciteTableName);
-
-                        return tablesVv
-                                .update(causalityToken,
-                                        (tables, ex) -> {
-                                            if (ex != null) {
-                                                return failedFuture(ex);
-                                            }
-
-                                            Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
-
-                                            resTbls.remove(table.id());
-
-                                            return completedFuture(resTbls);
-                                        }
-                                )
-                                .thenCompose(tables -> completedFuture(res));
-                    }
-
-                    return completedFuture(res);
+        if (!busyLock.enterBusy()) {
+            return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
+        }
+        try {
+            schemasVv.update(causalityToken, (schemas, e) -> inBusyLock(busyLock, () -> {
+                if (e != null) {
+                    return failedFuture(e);
                 }
-        );
 
-        return calciteSchemaVv.get(causalityToken);
+                Map<String, IgniteSchema> res = new HashMap<>(schemas);
+
+                IgniteSchema schema = res.computeIfAbsent(schemaName, IgniteSchema::new);
+
+                String calciteTableName = removeSchema(schemaName, tableName);
+
+                InternalIgniteTable table = (InternalIgniteTable) schema.getTable(calciteTableName);
+
+                if (table != null) {
+                    schema.removeTable(calciteTableName);
+
+                    return tablesVv.update(causalityToken, (tables, ex) -> inBusyLock(busyLock, () -> {
+                        if (ex != null) {
+                            return failedFuture(ex);
+                        }
+
+                        Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
+
+                        resTbls.remove(table.id());
+
+                        return completedFuture(resTbls);
+                    })).thenCompose(tables -> inBusyLock(busyLock, () -> completedFuture(res)));
+                }
+
+                return completedFuture(res);
+            }));
+
+            return calciteSchemaVv.get(causalityToken);
+        } finally {
+            busyLock.leaveBusy();
+        }
     }
 
     /**
@@ -339,7 +313,7 @@
 
     private CompletableFuture<IgniteTableImpl> convert(long causalityToken, TableImpl table) {
         return schemaManager.schemaRegistry(causalityToken, table.tableId())
-            .thenApply(schemaRegistry -> convert(table, schemaRegistry));
+            .thenApply(schemaRegistry -> inBusyLock(busyLock, () -> convert(table, schemaRegistry)));
     }
 
     private IgniteTableImpl convert(TableImpl table) {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
index 3496b2d..37ab1cc 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
@@ -50,6 +50,7 @@
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.NodeStoppingException;
 import org.junit.jupiter.api.BeforeEach;
@@ -90,6 +91,9 @@
 
     private TestRevisionRegister testRevisionRegister;
 
+    /** Busy lock for stop synchronisation. */
+    private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
     @BeforeEach
     public void setup() throws NodeStoppingException {
         Mockito.reset(tableManager);
@@ -99,7 +103,8 @@
         sqlSchemaManager = new SqlSchemaManagerImpl(
                 tableManager,
                 schemaManager,
-                testRevisionRegister
+                testRevisionRegister,
+                busyLock
         );
 
         testRevisionRegister.moveForward();
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ba0e292..2a3ccec 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -23,6 +23,7 @@
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
 import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static org.apache.ignite.internal.utils.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
 import static org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
@@ -182,6 +183,12 @@
     /** Set of futures that should complete before completion of {@link #tablesByIdVv}, after completion this set is cleared. */
     private final Set<CompletableFuture<?>> beforeTablesVvComplete = new ConcurrentHashSet<>();
 
+    /**
+     * {@link TableImpl} is created during update of tablesByIdVv, we store reference to it in case of updating of tablesByIdVv fails,
+     * so we can stop resources associated with the table.
+     */
+    private final Map<UUID, TableImpl> tablesToStopInCaseOfError = new ConcurrentHashMap<>();
+
     /** Resolver that resolves a network address to node id. */
     private final Function<NetworkAddress, String> netAddrResolver;
 
@@ -257,13 +264,42 @@
 
             beforeTablesVvComplete.clear();
 
-            return CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}))
+            return CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}))
                     .orTimeout(TABLES_COMPLETE_TIMEOUT, TimeUnit.SECONDS)
                     .whenComplete((v, e) -> {
-                        if (e != null) {
-                            tablesByIdVv.completeExceptionally(token, e);
-                        } else {
+                        if (!busyLock.enterBusy()) {
+                            if (e != null) {
+                                LOG.warn("Error occurred while updating tables and stopping components.", e);
+                                // Stop of the components has been started, so we do nothing and resources of tablesByIdVv will be
+                                // freed in the logic of TableManager stop. We cannot complete tablesByIdVv exceptionally because
+                                // we will lose a context of tables.
+                            }
+                            return;
+                        }
+                        try {
+                            if (e != null) {
+                                LOG.warn("Error occurred while updating tables.", e);
+                                if (e instanceof CompletionException) {
+                                    Throwable th = e.getCause();
+                                    // Case when stopping of the previous component has been started and related futures completed
+                                    // exceptionally
+                                    if (th instanceof NodeStoppingException || (th.getCause() != null
+                                            && th.getCause() instanceof NodeStoppingException)) {
+                                        // Stop of the components has been started so we do nothing and resources will be freed in the
+                                        // logic of TableManager stop
+                                        return;
+                                    }
+                                }
+                                // TODO: https://issues.apache.org/jira/browse/IGNITE-17515
+                                tablesByIdVv.completeExceptionally(token, e);
+                            }
+
+                            //Normal scenario, when all related futures for tablesByIdVv are completed and we can complete tablesByIdVv
                             tablesByIdVv.complete(token);
+
+                            tablesToStopInCaseOfError.clear();
+                        } finally {
+                            busyLock.leaveBusy();
                         }
                     });
         });
@@ -599,6 +635,22 @@
 
         Map<UUID, TableImpl> tables = tablesByIdVv.latest();
 
+        cleanUpTablesResources(tables);
+
+        cleanUpTablesResources(tablesToStopInCaseOfError);
+
+        tablesToStopInCaseOfError.clear();
+
+        shutdownAndAwaitTermination(rebalanceScheduler, 10, TimeUnit.SECONDS);
+        shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Stops resources that are related to provided tables.
+     *
+     * @param tables Tables to stop.
+     */
+    private void cleanUpTablesResources(Map<UUID, TableImpl> tables) {
         for (TableImpl table : tables.values()) {
             try {
                 for (int p = 0; p < table.internalTable().partitions(); p++) {
@@ -611,9 +663,6 @@
                 LOG.info("Unable to stop table [name={}]", e, table.name());
             }
         }
-
-        shutdownAndAwaitTermination(rebalanceScheduler, 10, TimeUnit.SECONDS);
-        shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS);
     }
 
     /**
@@ -657,7 +706,7 @@
 
         var table = new TableImpl(internalTable);
 
-        tablesByIdVv.update(causalityToken, (previous, e) -> {
+        tablesByIdVv.update(causalityToken, (previous, e) -> inBusyLock(busyLock, () -> {
             if (e != null) {
                 return failedFuture(e);
             }
@@ -667,16 +716,20 @@
             val.put(tblId, table);
 
             return completedFuture(val);
-        });
+        }));
 
         CompletableFuture<?> schemaFut = schemaManager.schemaRegistry(causalityToken, tblId)
-                .thenAccept(table::schemaView)
-                .thenCompose(v -> fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, table)));
+                .thenAccept(schema -> inBusyLock(busyLock, () -> table.schemaView(schema)))
+                .thenCompose(
+                        v -> inBusyLock(busyLock, () -> fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, table)))
+                );
 
         beforeTablesVvComplete.add(schemaFut);
 
+        tablesToStopInCaseOfError.put(tblId, table);
+
         // TODO should be reworked in IGNITE-16763
-        return tablesByIdVv.get(causalityToken).thenRun(() -> completeApiCreateFuture(table));
+        return tablesByIdVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table)));
     }
 
     /**
@@ -710,7 +763,7 @@
                 raftMgr.stopRaftGroup(partitionRaftGroupName(tblId, p));
             }
 
-            tablesByIdVv.update(causalityToken, (previousVal, e) -> {
+            tablesByIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
                 if (e != null) {
                     return failedFuture(e);
                 }
@@ -720,7 +773,7 @@
                 map.remove(tblId);
 
                 return completedFuture(map);
-            });
+            }));
 
             TableImpl table = tablesByIdVv.latest().get(tblId);
 
@@ -730,7 +783,9 @@
             table.internalTable().storage().destroy();
 
             CompletableFuture<?> fut = schemaManager.dropRegistry(causalityToken, table.tableId())
-                    .thenCompose(v -> fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, table)));
+                    .thenCompose(
+                            v -> inBusyLock(busyLock, () -> fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, table)))
+                    );
 
             beforeTablesVvComplete.add(fut);
         } catch (Exception e) {
@@ -1082,8 +1137,8 @@
      */
     private CompletableFuture<List<Table>> tablesAsyncInternal() {
         // TODO: IGNITE-16288 directTableIds should use async configuration API
-        return CompletableFuture.supplyAsync(this::directTableIds)
-                .thenCompose(tableIds -> {
+        return CompletableFuture.supplyAsync(() -> inBusyLock(busyLock, this::directTableIds))
+                .thenCompose(tableIds -> inBusyLock(busyLock, () -> {
                     var tableFuts = new CompletableFuture[tableIds.size()];
 
                     var i = 0;
@@ -1092,7 +1147,7 @@
                         tableFuts[i++] = tableAsyncInternal(tblId, false);
                     }
 
-                    return allOf(tableFuts).thenApply(unused -> {
+                    return allOf(tableFuts).thenApply(unused -> inBusyLock(busyLock, () -> {
                         var tables = new ArrayList<Table>(tableIds.size());
 
                         try {
@@ -1108,8 +1163,8 @@
                         }
 
                         return tables;
-                    });
-                });
+                    }));
+                }));
     }
 
     /**