IGNITE-22001 Throw specific exception if during writeTableAssignmentsToMetastore process was interrupted (#3575)
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
index 60d2c14..d0498b8 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
@@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
@@ -135,4 +136,14 @@
public String toString() {
return S.toString(this);
}
+
+ /**
+ * Creates a string representation of the given assignments list for logging usage purpose mostly.
+ *
+ * @param assignments List of assignments to present as string.
+ * @return String representation of the given assignments list.
+ */
+ public static String assignmentListToString(List<Assignments> assignments) {
+ return S.toString(assignments, (sb, e, i) -> sb.app(i).app('=').app(e.nodes()));
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4e8084e..2798469 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -195,7 +195,6 @@
import org.apache.ignite.internal.table.distributed.wrappers.ExecutorInclinedPlacementDriver;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
@@ -210,6 +209,7 @@
import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableStorage;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
@@ -697,7 +697,7 @@
* @param assignmentsFuture Assignments future, to get the assignments that should be written.
* @return Real list of assignments.
*/
- private CompletableFuture<List<Assignments>> writeTableAssignmentsToMetastore(
+ public CompletableFuture<List<Assignments>> writeTableAssignmentsToMetastore(
int tableId,
CompletableFuture<List<Assignments>> assignmentsFuture
) {
@@ -707,20 +707,37 @@
List<Operation> partitionAssignments = new ArrayList<>(newAssignments.size());
for (int i = 0; i < newAssignments.size(); i++) {
- partitionAssignments.add(put(
- stablePartAssignmentsKey(
- new TablePartitionId(tableId, i)),
- newAssignments.get(i).toBytes()));
+ ByteArray stableAssignmentsKey = stablePartAssignmentsKey(new TablePartitionId(tableId, i));
+ byte[] anAssignment = newAssignments.get(i).toBytes();
+ Operation op = put(stableAssignmentsKey, anAssignment);
+ partitionAssignments.add(op);
}
Condition condition = notExists(new ByteArray(partitionAssignments.get(0).key()));
return metaStorageMgr
.invoke(condition, partitionAssignments, Collections.emptyList())
+ .handle((invokeResult, e) -> {
+ if (e != null) {
+ LOG.error(
+ "Couldn't write assignments [assignmentsList={}] to metastore during invoke.",
+ e,
+ Assignments.assignmentListToString(newAssignments)
+ );
+
+ throw ExceptionUtils.sneakyThrow(e);
+ }
+
+ return invokeResult;
+ })
.thenCompose(invokeResult -> {
if (invokeResult) {
- LOG.info(IgniteStringFormatter.format("Assignments calculated from data nodes are successfully written"
- + " to meta storage [tableId={}, assignments={}]", tableId, assignmentListToString(newAssignments)));
+ LOG.info(
+ "Assignments calculated from data nodes are successfully written to meta storage"
+ + " [tableId={}, assignments={}].",
+ tableId,
+ Assignments.assignmentListToString(newAssignments)
+ );
return completedFuture(newAssignments);
} else {
@@ -745,17 +762,24 @@
realAssignments.add(real);
}
- LOG.info(IgniteStringFormatter.format("Assignments picked up from meta storage [tableId={}, "
- + "assignments={}]", tableId, assignmentListToString(realAssignments)));
+ LOG.info(
+ "Assignments picked up from meta storage [tableId={}, assignments={}].",
+ tableId,
+ Assignments.assignmentListToString(realAssignments)
+ );
return realAssignments;
});
}
})
- .exceptionally(e -> {
- LOG.error("Couldn't write assignments to metastore", e);
+ .handle((realAssignments, e) -> {
+ if (e != null) {
+ LOG.error("Couldn't get assignments from metastore for table [tableId={}].", e, tableId);
- return null;
+ throw ExceptionUtils.sneakyThrow(e);
+ }
+
+ return realAssignments;
});
});
}
@@ -1240,32 +1264,16 @@
) {
return inBusyLockAsync(busyLock, () -> {
int tableId = tableDescriptor.id();
- int zoneId = tableDescriptor.zoneId();
// Retrieve descriptor during synchronous call, before the previous catalog version could be concurrently compacted.
CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion);
- CompletableFuture<List<Assignments>> assignmentsFuture;
-
- // Check if the table already has assignments in the meta storage locally.
- // So, it means, that it is a recovery process and we should use the meta storage local assignments instead of calculation
- // of the new ones.
- if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, causalityToken) != null) {
- assignmentsFuture = completedFuture(
- tableAssignmentsGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken));
- } else {
- assignmentsFuture = distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneId)
- .thenApply(dataNodes -> AffinityUtils.calculateAssignments(
- dataNodes,
- zoneDescriptor.partitions(),
- zoneDescriptor.replicas()
- ).stream().map(Assignments::of).collect(toList()));
-
- assignmentsFuture.thenAccept(assignmentsList -> {
- LOG.info(IgniteStringFormatter.format("Assignments calculated from data nodes [table={}, tableId={}, assignments={}, "
- + "revision={}]", tableDescriptor.name(), tableId, assignmentListToString(assignmentsList), causalityToken));
- });
- }
+ CompletableFuture<List<Assignments>> assignmentsFuture = getOrCreateAssignments(
+ tableDescriptor,
+ zoneDescriptor,
+ causalityToken,
+ catalogVersion
+ );
CompletableFuture<List<Assignments>> assignmentsFutureAfterInvoke =
writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
@@ -1396,13 +1404,40 @@
}
/**
- * Creates a string representation of the given assignments list to use it for logging.
- *
- * @param assignments List of assignments.
- * @return String representation of the given assignments list to use it for logging.
+ * Check if the table already has assignments in the meta storage locally.
+ * So, it means, that it is a recovery process and we should use the meta storage local assignments instead of calculation
+ * of the new ones.
*/
- private static String assignmentListToString(List<Assignments> assignments) {
- return S.toString(assignments, (sb, e, i) -> sb.app(i).app('=').app(e.nodes()));
+ private CompletableFuture<List<Assignments>> getOrCreateAssignments(
+ CatalogTableDescriptor tableDescriptor,
+ CatalogZoneDescriptor zoneDescriptor,
+ long causalityToken,
+ int catalogVersion
+ ) {
+ int tableId = tableDescriptor.id();
+ CompletableFuture<List<Assignments>> assignmentsFuture;
+
+ if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, causalityToken) != null) {
+ assignmentsFuture = completedFuture(
+ tableAssignmentsGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken));
+ } else {
+ assignmentsFuture = distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneDescriptor.id())
+ .thenApply(dataNodes -> AffinityUtils.calculateAssignments(
+ dataNodes,
+ zoneDescriptor.partitions(),
+ zoneDescriptor.replicas()
+ ).stream().map(Assignments::of).collect(toList()));
+
+ assignmentsFuture.thenAccept(assignmentsList -> LOG.info(
+ "Assignments calculated from data nodes [table={}, tableId={}, assignments={}, revision={}]",
+ tableDescriptor.name(),
+ tableId,
+ Assignments.assignmentListToString(assignmentsList),
+ causalityToken
+ ));
+ }
+
+ return assignmentsFuture;
}
/**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 1c2c3a4..ada4b72 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -70,9 +70,12 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.Assignments;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogTestUtils;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
@@ -342,6 +345,40 @@
}
/**
+ * Testing TableManager#writeTableAssignmentsToMetastore for 2 exceptional scenarios:
+ * 1. the method was interrupted in outer future before invoke calling completion.
+ * 2. the method was interrupted in inner metastore's future when the result of invocation had gotten, but after error happens;
+ *
+ * @throws Exception if something goes wrong on mocks creation.
+ */
+ @Test
+ public void testWriteTableAssignmentsToMetastoreExceptionally() throws Exception {
+ TableViewInternal table = mockManagersAndCreateTable(DYNAMIC_TABLE_NAME, tblManagerFut);
+ int tableId = table.tableId();
+ TableManager tableManager = tblManagerFut.join();
+ List<Assignments> assignmentsList = List.of(Assignments.of(Assignment.forPeer(node.id())));
+
+ // the first case scenario
+ CompletableFuture<List<Assignments>> assignmentsFuture = new CompletableFuture<>();
+ var outerExceptionMsg = "Outer future is interrupted";
+ assignmentsFuture.completeExceptionally(new TimeoutException(outerExceptionMsg));
+ CompletableFuture<List<Assignments>> writtenAssignmentsFuture = tableManager
+ .writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
+ assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
+ assertThrowsWithCause(writtenAssignmentsFuture::get, TimeoutException.class, outerExceptionMsg);
+
+ // the second case scenario
+ assignmentsFuture = completedFuture(assignmentsList);
+ CompletableFuture<Boolean> invokeTimeoutFuture = new CompletableFuture<>();
+ var innerExceptionMsg = "Inner future is interrupted";
+ invokeTimeoutFuture.completeExceptionally(new TimeoutException(innerExceptionMsg));
+ when(msm.invoke(any(), any(List.class), any(List.class))).thenReturn(invokeTimeoutFuture);
+ writtenAssignmentsFuture = tableManager.writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
+ assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
+ assertThrowsWithCause(writtenAssignmentsFuture::get, TimeoutException.class, innerExceptionMsg);
+ }
+
+ /**
* Tests drop a table through public API.
*
* @throws Exception If failed.