IGNITE-17286 Added missed busy locks to get rid of resources leaking during table creation. Fixes #977
Signed-off-by: Slava Koptilin <slava.koptilin@gmail.com>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
index 1eb90e1..92a3d2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/causality/VersionedValue.java
@@ -578,7 +578,7 @@
try {
for (Long token : history.keySet()) {
- if (token != lastToken && causalityToken - token >= historySize) {
+ if (!token.equals(lastToken) && causalityToken - token >= historySize) {
history.remove(token);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 77478a8..e19d734 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.util;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.AtomicMoveNotSupportedException;
@@ -44,11 +46,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.util.worker.IgniteWorker;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringBuilder;
import org.apache.ignite.lang.IgniteStringFormatter;
+import org.apache.ignite.lang.NodeStoppingException;
import org.jetbrains.annotations.Nullable;
/**
@@ -787,4 +792,40 @@
}
}
}
+
+ /**
+ * Method that runs the provided {@code fn} in {@code busyLock}.
+ *
+ * @param busyLock Component's busy lock
+ * @param fn Function to run
+ * @param <T> Type of returned value from {@code fn}
+ * @return Result of the provided function
+ */
+ public static <T> T inBusyLock(IgniteSpinBusyLock busyLock, Supplier<T> fn) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+ }
+ try {
+ return fn.get();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Method that runs the provided {@code fn} in {@code busyLock}.
+ *
+ * @param busyLock Component's busy lock
+ * @param fn Runnable to run
+ */
+ public static void inBusyLock(IgniteSpinBusyLock busyLock, Runnable fn) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
+ }
+ try {
+ fn.run();
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 8dfe45e..760b2f7 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -254,7 +254,8 @@
/** {@inheritDoc} */
@Override
public void stop() throws Exception {
- assert groups.isEmpty() : IgniteStringFormatter.format("Raft groups are still running {}", groups.keySet());
+ assert groups.isEmpty() : IgniteStringFormatter.format("Raft groups {} are still running on the node {}", groups.keySet(),
+ service.topologyService().localMember().name());
rpcServer.shutdown();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
index 68549f7..e43f683 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/AbstractClusterIntegrationTest.java
@@ -91,7 +91,7 @@
})
.collect(toList());
- String metaStorageNodeName = testNodeName(testInfo, 0);
+ String metaStorageNodeName = testNodeName(testInfo, nodes() - 1);
IgnitionManager.init(metaStorageNodeName, List.of(metaStorageNodeName), "cluster");
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index b03c77e..16a25c5 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -20,6 +20,8 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.HashMap;
import java.util.Map;
@@ -107,22 +109,30 @@
* @return A future.
*/
private CompletableFuture<?> onSchemaCreate(ConfigurationNotificationEvent<SchemaView> schemasCtx) {
- long causalityToken = schemasCtx.storageRevision();
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
+ }
- ExtendedTableConfiguration tblCfg = schemasCtx.config(ExtendedTableConfiguration.class);
+ try {
+ long causalityToken = schemasCtx.storageRevision();
- UUID tblId = tblCfg.id().value();
+ ExtendedTableConfiguration tblCfg = schemasCtx.config(ExtendedTableConfiguration.class);
- String tableName = tblCfg.name().value();
+ UUID tblId = tblCfg.id().value();
- SchemaDescriptor schemaDescriptor = SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
+ String tableName = tblCfg.name().value();
- CompletableFuture<?> createSchemaFut = createSchema(causalityToken, tblId, tableName, schemaDescriptor);
+ SchemaDescriptor schemaDescriptor = SchemaSerializerImpl.INSTANCE.deserialize((schemasCtx.newValue().schema()));
- registriesVv.get(causalityToken)
- .thenRun(() -> fireEvent(SchemaEvent.CREATE, new SchemaEventParameters(causalityToken, tblId, schemaDescriptor)));
+ CompletableFuture<?> createSchemaFut = createSchema(causalityToken, tblId, tableName, schemaDescriptor);
- return createSchemaFut;
+ registriesVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock,
+ () -> fireEvent(SchemaEvent.CREATE, new SchemaEventParameters(causalityToken, tblId, schemaDescriptor))));
+
+ return createSchemaFut;
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -161,27 +171,29 @@
String tableName,
SchemaDescriptor schemaDescriptor
) {
- return registriesVv.update(causalityToken, (registries, e) -> {
+ return registriesVv.update(causalityToken, (registries, e) -> inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(new IgniteInternalException(IgniteStringFormatter.format(
- "Cannot create a schema for the table [tblId={}, ver={}]", tableId, schemaDescriptor.version()), e)
+ "Cannot create a schema for the table [tblId={}, ver={}]", tableId, schemaDescriptor.version()), e)
);
}
- SchemaRegistryImpl reg = registries.get(tableId);
+ Map<UUID, SchemaRegistryImpl> regs = registries;
+
+ SchemaRegistryImpl reg = regs.get(tableId);
if (reg == null) {
- registries = new HashMap<>(registries);
+ regs = new HashMap<>(registries);
SchemaRegistryImpl registry = createSchemaRegistry(tableId, tableName, schemaDescriptor);
- registries.put(tableId, registry);
+ regs.put(tableId, registry);
} else {
reg.onSchemaRegistered(schemaDescriptor);
}
- return completedFuture(registries);
- });
+ return completedFuture(regs);
+ }));
}
/**
@@ -195,7 +207,7 @@
private SchemaRegistryImpl createSchemaRegistry(UUID tableId, String tableName, SchemaDescriptor initialSchema) {
return new SchemaRegistryImpl(ver -> {
if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
}
try {
@@ -205,7 +217,7 @@
}
}, () -> {
if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
}
try {
@@ -336,11 +348,12 @@
*/
public CompletableFuture<SchemaRegistry> schemaRegistry(long causalityToken, @Nullable UUID tableId) {
if (!busyLock.enterBusy()) {
- throw new IgniteException(new NodeStoppingException());
+ throw new IgniteException(NODE_STOPPING_ERR, new NodeStoppingException());
}
try {
- return registriesVv.get(causalityToken).thenApply(regs -> tableId == null ? null : regs.get(tableId));
+ return registriesVv.get(causalityToken)
+ .thenApply(regs -> inBusyLock(busyLock, () -> tableId == null ? null : regs.get(tableId)));
} finally {
busyLock.leaveBusy();
}
@@ -363,20 +376,18 @@
* @param tableId Table id.
*/
public CompletableFuture<?> dropRegistry(long causalityToken, UUID tableId) {
- return registriesVv.update(causalityToken, (registries, e) -> {
+ return registriesVv.update(causalityToken, (registries, e) -> inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(new IgniteInternalException(
- IgniteStringFormatter.format("Cannot remove a schema registry for the table [tblId={}]", tableId), e
- )
- );
+ IgniteStringFormatter.format("Cannot remove a schema registry for the table [tblId={}]", tableId), e));
}
- registries = new HashMap<>(registries);
+ Map<UUID, SchemaRegistryImpl> regs = new HashMap<>(registries);
- registries.remove(tableId);
+ regs.remove(tableId);
- return completedFuture(registries);
- });
+ return completedFuture(regs);
+ }));
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 1e81423..028ae0c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -162,7 +162,7 @@
msgSrvc
));
- SqlSchemaManagerImpl sqlSchemaManager = new SqlSchemaManagerImpl(tableManager, schemaManager, registry);
+ SqlSchemaManagerImpl sqlSchemaManager = new SqlSchemaManagerImpl(tableManager, schemaManager, registry, busyLock);
sqlSchemaManager.registerListener(prepareSvc);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index 6e8d939..e069893 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -19,6 +19,8 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.Comparator;
import java.util.HashMap;
@@ -42,6 +44,7 @@
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.NodeStoppingException;
@@ -66,6 +69,9 @@
private final Set<SchemaUpdateListener> listeners = new CopyOnWriteArraySet<>();
+ /** Busy lock for stop synchronisation. */
+ private final IgniteSpinBusyLock busyLock;
+
/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
@@ -73,12 +79,14 @@
public SqlSchemaManagerImpl(
TableManager tableManager,
SchemaManager schemaManager,
- Consumer<Function<Long, CompletableFuture<?>>> registry
+ Consumer<Function<Long, CompletableFuture<?>>> registry,
+ IgniteSpinBusyLock busyLock
) {
this.tableManager = tableManager;
this.schemaManager = schemaManager;
schemasVv = new VersionedValue<>(registry, HashMap::new);
tablesVv = new VersionedValue<>(registry, HashMap::new);
+ this.busyLock = busyLock;
calciteSchemaVv = new VersionedValue<>(null, () -> {
SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
@@ -87,20 +95,29 @@
});
schemasVv.whenComplete((token, stringIgniteSchemaMap, throwable) -> {
- if (throwable != null) {
- calciteSchemaVv.completeExceptionally(
- token,
- new IgniteInternalException("Couldn't evaluate sql schemas for causality token: " + token, throwable)
- );
+ if (!busyLock.enterBusy()) {
+ calciteSchemaVv.completeExceptionally(token, new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
return;
}
+ try {
+ if (throwable != null) {
+ calciteSchemaVv.completeExceptionally(
+ token,
+ new IgniteInternalException("Couldn't evaluate sql schemas for causality token: " + token, throwable)
+ );
- SchemaPlus newCalciteSchema = rebuild(stringIgniteSchemaMap);
+ return;
+ }
- listeners.forEach(SchemaUpdateListener::onSchemaUpdated);
+ SchemaPlus newCalciteSchema = rebuild(stringIgniteSchemaMap);
- calciteSchemaVv.complete(token, newCalciteSchema);
+ listeners.forEach(SchemaUpdateListener::onSchemaUpdated);
+
+ calciteSchemaVv.complete(token, newCalciteSchema);
+ } finally {
+ busyLock.leaveBusy();
+ }
});
}
@@ -116,27 +133,34 @@
@Override
@NotNull
public IgniteTable tableById(UUID id, int ver) {
- IgniteTable table = tablesVv.latest().get(id);
-
- // there is a chance that someone tries to resolve table before
- // the distributed event of that table creation has been processed
- // by TableManager, so we need to get in sync with the TableManager
- if (table == null || ver > table.version()) {
- table = awaitLatestTableSchema(id);
+ if (!busyLock.enterBusy()) {
+ throw new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
}
+ try {
+ IgniteTable table = tablesVv.latest().get(id);
- if (table == null) {
- throw new IgniteInternalException(
- IgniteStringFormatter.format("Table not found [tableId={}]", id));
+ // there is a chance that someone tries to resolve table before
+ // the distributed event of that table creation has been processed
+ // by TableManager, so we need to get in sync with the TableManager
+ if (table == null || ver > table.version()) {
+ table = awaitLatestTableSchema(id);
+ }
+
+ if (table == null) {
+ throw new IgniteInternalException(
+ IgniteStringFormatter.format("Table not found [tableId={}]", id));
+ }
+
+ if (table.version() < ver) {
+ throw new IgniteInternalException(
+ IgniteStringFormatter.format("Table version not found [tableId={}, requiredVer={}, latestKnownVer={}]",
+ id, ver, table.version()));
+ }
+
+ return table;
+ } finally {
+ busyLock.leaveBusy();
}
-
- if (table.version() < ver) {
- throw new IgniteInternalException(
- IgniteStringFormatter.format("Table version not found [tableId={}, requiredVer={}, latestKnownVer={}]",
- id, ver, table.version()));
- }
-
- return table;
}
public void registerListener(SchemaUpdateListener listener) {
@@ -155,57 +179,11 @@
return convert(table);
} catch (NodeStoppingException e) {
- throw new IgniteInternalException(e);
+ throw new IgniteInternalException(NODE_STOPPING_ERR, e);
}
}
/**
- * Schema creation handler.
- *
- * @param schemaName Schema name.
- * @param causalityToken Causality token.
- */
- public synchronized void onSchemaCreated(String schemaName, long causalityToken) {
- schemasVv.update(
- causalityToken,
- (schemas, e) -> {
- if (e != null) {
- return failedFuture(e);
- }
-
- Map<String, IgniteSchema> res = new HashMap<>(schemas);
-
- res.putIfAbsent(schemaName, new IgniteSchema(schemaName));
-
- return completedFuture(res);
- }
- );
- }
-
- /**
- * Schema drop handler.
- *
- * @param schemaName Schema name.
- * @param causalityToken Causality token.
- */
- public synchronized void onSchemaDropped(String schemaName, long causalityToken) {
- schemasVv.update(
- causalityToken,
- (schemas, e) -> {
- if (e != null) {
- return failedFuture(e);
- }
-
- Map<String, IgniteSchema> res = new HashMap<>(schemas);
-
- res.remove(schemaName);
-
- return completedFuture(res);
- }
- );
- }
-
- /**
* OnSqlTypeCreated.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
@@ -214,50 +192,45 @@
TableImpl table,
long causalityToken
) {
- schemasVv.update(
- causalityToken,
- (schemas, e) -> {
- if (e != null) {
- return failedFuture(e);
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
+ }
+ try {
+ schemasVv.update(causalityToken, (schemas, e) -> inBusyLock(busyLock, () -> {
+ if (e != null) {
+ return failedFuture(e);
+ }
+
+ Map<String, IgniteSchema> res = new HashMap<>(schemas);
+
+ IgniteSchema schema = res.computeIfAbsent(schemaName, IgniteSchema::new);
+
+ CompletableFuture<IgniteTableImpl> igniteTableFuture = convert(causalityToken, table);
+
+ return tablesVv.update(causalityToken, (tables, ex) -> inBusyLock(busyLock, () -> {
+ if (ex != null) {
+ return failedFuture(ex);
}
- Map<String, IgniteSchema> res = new HashMap<>(schemas);
+ Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
- IgniteSchema schema = res.computeIfAbsent(schemaName, IgniteSchema::new);
+ return igniteTableFuture.thenApply(igniteTable -> inBusyLock(busyLock, () -> {
+ resTbls.put(igniteTable.id(), igniteTable);
- CompletableFuture<IgniteTableImpl> igniteTableFuture = convert(causalityToken, table);
+ return resTbls;
+ }));
+ })).thenCombine(igniteTableFuture, (v, igniteTable) -> inBusyLock(busyLock, () -> {
+ schema.addTable(removeSchema(schemaName, table.name()), igniteTable);
- return tablesVv
- .update(
- causalityToken,
- (tables, ex) -> {
- if (ex != null) {
- return failedFuture(ex);
- }
+ return null;
+ })).thenCompose(v -> inBusyLock(busyLock, () -> completedFuture(res)));
- Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
+ }));
- return igniteTableFuture
- .thenApply(igniteTable -> {
- resTbls.put(igniteTable.id(), igniteTable);
-
- return resTbls;
- });
- }
- )
- .thenCombine(
- igniteTableFuture,
- (v, igniteTable) -> {
- schema.addTable(removeSchema(schemaName, table.name()), igniteTable);
-
- return null;
- }
- )
- .thenCompose(v -> completedFuture(res));
- }
- );
-
- return calciteSchemaVv.get(causalityToken);
+ return calciteSchemaVv.get(causalityToken);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -281,45 +254,46 @@
String tableName,
long causalityToken
) {
- schemasVv.update(causalityToken,
- (schemas, e) -> {
- if (e != null) {
- return failedFuture(e);
- }
-
- Map<String, IgniteSchema> res = new HashMap<>(schemas);
-
- IgniteSchema schema = res.computeIfAbsent(schemaName, IgniteSchema::new);
-
- String calciteTableName = removeSchema(schemaName, tableName);
-
- InternalIgniteTable table = (InternalIgniteTable) schema.getTable(calciteTableName);
-
- if (table != null) {
- schema.removeTable(calciteTableName);
-
- return tablesVv
- .update(causalityToken,
- (tables, ex) -> {
- if (ex != null) {
- return failedFuture(ex);
- }
-
- Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
-
- resTbls.remove(table.id());
-
- return completedFuture(resTbls);
- }
- )
- .thenCompose(tables -> completedFuture(res));
- }
-
- return completedFuture(res);
+ if (!busyLock.enterBusy()) {
+ return failedFuture(new IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException()));
+ }
+ try {
+ schemasVv.update(causalityToken, (schemas, e) -> inBusyLock(busyLock, () -> {
+ if (e != null) {
+ return failedFuture(e);
}
- );
- return calciteSchemaVv.get(causalityToken);
+ Map<String, IgniteSchema> res = new HashMap<>(schemas);
+
+ IgniteSchema schema = res.computeIfAbsent(schemaName, IgniteSchema::new);
+
+ String calciteTableName = removeSchema(schemaName, tableName);
+
+ InternalIgniteTable table = (InternalIgniteTable) schema.getTable(calciteTableName);
+
+ if (table != null) {
+ schema.removeTable(calciteTableName);
+
+ return tablesVv.update(causalityToken, (tables, ex) -> inBusyLock(busyLock, () -> {
+ if (ex != null) {
+ return failedFuture(ex);
+ }
+
+ Map<UUID, IgniteTable> resTbls = new HashMap<>(tables);
+
+ resTbls.remove(table.id());
+
+ return completedFuture(resTbls);
+ })).thenCompose(tables -> inBusyLock(busyLock, () -> completedFuture(res)));
+ }
+
+ return completedFuture(res);
+ }));
+
+ return calciteSchemaVv.get(causalityToken);
+ } finally {
+ busyLock.leaveBusy();
+ }
}
/**
@@ -339,7 +313,7 @@
private CompletableFuture<IgniteTableImpl> convert(long causalityToken, TableImpl table) {
return schemaManager.schemaRegistry(causalityToken, table.tableId())
- .thenApply(schemaRegistry -> convert(table, schemaRegistry));
+ .thenApply(schemaRegistry -> inBusyLock(busyLock, () -> convert(table, schemaRegistry)));
}
private IgniteTableImpl convert(TableImpl table) {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
index 3496b2d..37ab1cc 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/schema/SqlSchemaManagerTest.java
@@ -50,6 +50,7 @@
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
import org.junit.jupiter.api.BeforeEach;
@@ -90,6 +91,9 @@
private TestRevisionRegister testRevisionRegister;
+ /** Busy lock for stop synchronisation. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
@BeforeEach
public void setup() throws NodeStoppingException {
Mockito.reset(tableManager);
@@ -99,7 +103,8 @@
sqlSchemaManager = new SqlSchemaManagerImpl(
tableManager,
schemaManager,
- testRevisionRegister
+ testRevisionRegister,
+ busyLock
);
testRevisionRegister.moveForward();
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index ba0e292..2a3ccec 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -23,6 +23,7 @@
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.getByInternalId;
import static org.apache.ignite.internal.schema.SchemaManager.INITIAL_SCHEMA_VERSION;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.apache.ignite.internal.utils.RebalanceUtil.PENDING_ASSIGNMENTS_PREFIX;
import static org.apache.ignite.internal.utils.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
@@ -182,6 +183,12 @@
/** Set of futures that should complete before completion of {@link #tablesByIdVv}, after completion this set is cleared. */
private final Set<CompletableFuture<?>> beforeTablesVvComplete = new ConcurrentHashSet<>();
+ /**
+ * {@link TableImpl} is created during update of tablesByIdVv, we store reference to it in case of updating of tablesByIdVv fails,
+ * so we can stop resources associated with the table.
+ */
+ private final Map<UUID, TableImpl> tablesToStopInCaseOfError = new ConcurrentHashMap<>();
+
/** Resolver that resolves a network address to node id. */
private final Function<NetworkAddress, String> netAddrResolver;
@@ -257,13 +264,42 @@
beforeTablesVvComplete.clear();
- return CompletableFuture.allOf(futures.toArray(new CompletableFuture[] {}))
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}))
.orTimeout(TABLES_COMPLETE_TIMEOUT, TimeUnit.SECONDS)
.whenComplete((v, e) -> {
- if (e != null) {
- tablesByIdVv.completeExceptionally(token, e);
- } else {
+ if (!busyLock.enterBusy()) {
+ if (e != null) {
+ LOG.warn("Error occurred while updating tables and stopping components.", e);
+ // Stop of the components has been started, so we do nothing and resources of tablesByIdVv will be
+ // freed in the logic of TableManager stop. We cannot complete tablesByIdVv exceptionally because
+ // we will lose a context of tables.
+ }
+ return;
+ }
+ try {
+ if (e != null) {
+ LOG.warn("Error occurred while updating tables.", e);
+ if (e instanceof CompletionException) {
+ Throwable th = e.getCause();
+ // Case when stopping of the previous component has been started and related futures completed
+ // exceptionally
+ if (th instanceof NodeStoppingException || (th.getCause() != null
+ && th.getCause() instanceof NodeStoppingException)) {
+ // Stop of the components has been started so we do nothing and resources will be freed in the
+ // logic of TableManager stop
+ return;
+ }
+ }
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-17515
+ tablesByIdVv.completeExceptionally(token, e);
+ }
+
+ //Normal scenario, when all related futures for tablesByIdVv are completed and we can complete tablesByIdVv
tablesByIdVv.complete(token);
+
+ tablesToStopInCaseOfError.clear();
+ } finally {
+ busyLock.leaveBusy();
}
});
});
@@ -599,6 +635,22 @@
Map<UUID, TableImpl> tables = tablesByIdVv.latest();
+ cleanUpTablesResources(tables);
+
+ cleanUpTablesResources(tablesToStopInCaseOfError);
+
+ tablesToStopInCaseOfError.clear();
+
+ shutdownAndAwaitTermination(rebalanceScheduler, 10, TimeUnit.SECONDS);
+ shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Stops resources that are related to provided tables.
+ *
+ * @param tables Tables to stop.
+ */
+ private void cleanUpTablesResources(Map<UUID, TableImpl> tables) {
for (TableImpl table : tables.values()) {
try {
for (int p = 0; p < table.internalTable().partitions(); p++) {
@@ -611,9 +663,6 @@
LOG.info("Unable to stop table [name={}]", e, table.name());
}
}
-
- shutdownAndAwaitTermination(rebalanceScheduler, 10, TimeUnit.SECONDS);
- shutdownAndAwaitTermination(ioExecutor, 10, TimeUnit.SECONDS);
}
/**
@@ -657,7 +706,7 @@
var table = new TableImpl(internalTable);
- tablesByIdVv.update(causalityToken, (previous, e) -> {
+ tablesByIdVv.update(causalityToken, (previous, e) -> inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(e);
}
@@ -667,16 +716,20 @@
val.put(tblId, table);
return completedFuture(val);
- });
+ }));
CompletableFuture<?> schemaFut = schemaManager.schemaRegistry(causalityToken, tblId)
- .thenAccept(table::schemaView)
- .thenCompose(v -> fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, table)));
+ .thenAccept(schema -> inBusyLock(busyLock, () -> table.schemaView(schema)))
+ .thenCompose(
+ v -> inBusyLock(busyLock, () -> fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, table)))
+ );
beforeTablesVvComplete.add(schemaFut);
+ tablesToStopInCaseOfError.put(tblId, table);
+
// TODO should be reworked in IGNITE-16763
- return tablesByIdVv.get(causalityToken).thenRun(() -> completeApiCreateFuture(table));
+ return tablesByIdVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table)));
}
/**
@@ -710,7 +763,7 @@
raftMgr.stopRaftGroup(partitionRaftGroupName(tblId, p));
}
- tablesByIdVv.update(causalityToken, (previousVal, e) -> {
+ tablesByIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(e);
}
@@ -720,7 +773,7 @@
map.remove(tblId);
return completedFuture(map);
- });
+ }));
TableImpl table = tablesByIdVv.latest().get(tblId);
@@ -730,7 +783,9 @@
table.internalTable().storage().destroy();
CompletableFuture<?> fut = schemaManager.dropRegistry(causalityToken, table.tableId())
- .thenCompose(v -> fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, table)));
+ .thenCompose(
+ v -> inBusyLock(busyLock, () -> fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, table)))
+ );
beforeTablesVvComplete.add(fut);
} catch (Exception e) {
@@ -1082,8 +1137,8 @@
*/
private CompletableFuture<List<Table>> tablesAsyncInternal() {
// TODO: IGNITE-16288 directTableIds should use async configuration API
- return CompletableFuture.supplyAsync(this::directTableIds)
- .thenCompose(tableIds -> {
+ return CompletableFuture.supplyAsync(() -> inBusyLock(busyLock, this::directTableIds))
+ .thenCompose(tableIds -> inBusyLock(busyLock, () -> {
var tableFuts = new CompletableFuture[tableIds.size()];
var i = 0;
@@ -1092,7 +1147,7 @@
tableFuts[i++] = tableAsyncInternal(tblId, false);
}
- return allOf(tableFuts).thenApply(unused -> {
+ return allOf(tableFuts).thenApply(unused -> inBusyLock(busyLock, () -> {
var tables = new ArrayList<Table>(tableIds.size());
try {
@@ -1108,8 +1163,8 @@
}
return tables;
- });
- });
+ }));
+ }));
}
/**