IGNITE-22001 Throw specific exception if during writeTableAssignmentsToMetastore process was interrupted (#3575)

diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
index 60d2c14..d0498b8 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/Assignments.java
@@ -24,6 +24,7 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import org.apache.ignite.internal.tostring.IgniteToStringInclude;
 import org.apache.ignite.internal.tostring.S;
@@ -135,4 +136,14 @@
     public String toString() {
         return S.toString(this);
     }
+
+    /**
+     * Creates a string representation of the given assignments list for logging usage purpose mostly.
+     *
+     * @param assignments List of assignments to present as string.
+     * @return String representation of the given assignments list.
+     */
+    public static String assignmentListToString(List<Assignments> assignments) {
+        return S.toString(assignments, (sb, e, i) -> sb.app(i).app('=').app(e.nodes()));
+    }
 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4e8084e..2798469 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -195,7 +195,6 @@
 import org.apache.ignite.internal.table.distributed.wrappers.ExecutorInclinedPlacementDriver;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
-import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
@@ -210,6 +209,7 @@
 import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableStorage;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
@@ -697,7 +697,7 @@
      * @param assignmentsFuture Assignments future, to get the assignments that should be written.
      * @return Real list of assignments.
      */
-    private CompletableFuture<List<Assignments>> writeTableAssignmentsToMetastore(
+    public CompletableFuture<List<Assignments>> writeTableAssignmentsToMetastore(
             int tableId,
             CompletableFuture<List<Assignments>> assignmentsFuture
     ) {
@@ -707,20 +707,37 @@
             List<Operation> partitionAssignments = new ArrayList<>(newAssignments.size());
 
             for (int i = 0; i < newAssignments.size(); i++) {
-                partitionAssignments.add(put(
-                        stablePartAssignmentsKey(
-                                new TablePartitionId(tableId, i)),
-                        newAssignments.get(i).toBytes()));
+                ByteArray stableAssignmentsKey = stablePartAssignmentsKey(new TablePartitionId(tableId, i));
+                byte[] anAssignment = newAssignments.get(i).toBytes();
+                Operation op = put(stableAssignmentsKey, anAssignment);
+                partitionAssignments.add(op);
             }
 
             Condition condition = notExists(new ByteArray(partitionAssignments.get(0).key()));
 
             return metaStorageMgr
                     .invoke(condition, partitionAssignments, Collections.emptyList())
+                    .handle((invokeResult, e) -> {
+                        if (e != null) {
+                            LOG.error(
+                                    "Couldn't write assignments [assignmentsList={}] to metastore during invoke.",
+                                    e,
+                                    Assignments.assignmentListToString(newAssignments)
+                            );
+
+                            throw ExceptionUtils.sneakyThrow(e);
+                        }
+
+                        return invokeResult;
+                    })
                     .thenCompose(invokeResult -> {
                         if (invokeResult) {
-                            LOG.info(IgniteStringFormatter.format("Assignments calculated from data nodes are successfully written"
-                                    + " to meta storage [tableId={}, assignments={}]", tableId, assignmentListToString(newAssignments)));
+                            LOG.info(
+                                    "Assignments calculated from data nodes are successfully written to meta storage"
+                                            + " [tableId={}, assignments={}].",
+                                    tableId,
+                                    Assignments.assignmentListToString(newAssignments)
+                            );
 
                             return completedFuture(newAssignments);
                         } else {
@@ -745,17 +762,24 @@
                                     realAssignments.add(real);
                                 }
 
-                                LOG.info(IgniteStringFormatter.format("Assignments picked up from meta storage [tableId={}, "
-                                        + "assignments={}]", tableId, assignmentListToString(realAssignments)));
+                                LOG.info(
+                                        "Assignments picked up from meta storage [tableId={}, assignments={}].",
+                                        tableId,
+                                        Assignments.assignmentListToString(realAssignments)
+                                );
 
                                 return realAssignments;
                             });
                         }
                     })
-                    .exceptionally(e -> {
-                        LOG.error("Couldn't write assignments to metastore", e);
+                    .handle((realAssignments, e) -> {
+                        if (e != null) {
+                            LOG.error("Couldn't get assignments from metastore for table [tableId={}].", e, tableId);
 
-                        return null;
+                            throw ExceptionUtils.sneakyThrow(e);
+                        }
+
+                        return realAssignments;
                     });
         });
     }
@@ -1240,32 +1264,16 @@
     ) {
         return inBusyLockAsync(busyLock, () -> {
             int tableId = tableDescriptor.id();
-            int zoneId = tableDescriptor.zoneId();
 
             // Retrieve descriptor during synchronous call, before the previous catalog version could be concurrently compacted.
             CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion);
 
-            CompletableFuture<List<Assignments>> assignmentsFuture;
-
-            // Check if the table already has assignments in the meta storage locally.
-            // So, it means, that it is a recovery process and we should use the meta storage local assignments instead of calculation
-            // of the new ones.
-            if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, causalityToken) != null) {
-                assignmentsFuture = completedFuture(
-                        tableAssignmentsGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken));
-            } else {
-                assignmentsFuture = distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneId)
-                        .thenApply(dataNodes -> AffinityUtils.calculateAssignments(
-                                dataNodes,
-                                zoneDescriptor.partitions(),
-                                zoneDescriptor.replicas()
-                        ).stream().map(Assignments::of).collect(toList()));
-
-                assignmentsFuture.thenAccept(assignmentsList -> {
-                    LOG.info(IgniteStringFormatter.format("Assignments calculated from data nodes [table={}, tableId={}, assignments={}, "
-                            + "revision={}]", tableDescriptor.name(), tableId, assignmentListToString(assignmentsList), causalityToken));
-                });
-            }
+            CompletableFuture<List<Assignments>> assignmentsFuture = getOrCreateAssignments(
+                    tableDescriptor,
+                    zoneDescriptor,
+                    causalityToken,
+                    catalogVersion
+            );
 
             CompletableFuture<List<Assignments>> assignmentsFutureAfterInvoke =
                     writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
@@ -1396,13 +1404,40 @@
     }
 
     /**
-     * Creates a string representation of the given assignments list to use it for logging.
-     *
-     * @param assignments List of assignments.
-     * @return String representation of the given assignments list to use it for logging.
+     * Check if the table already has assignments in the meta storage locally.
+     * So, it means, that it is a recovery process and we should use the meta storage local assignments instead of calculation
+     * of the new ones.
      */
-    private static String assignmentListToString(List<Assignments> assignments) {
-        return S.toString(assignments, (sb, e, i) -> sb.app(i).app('=').app(e.nodes()));
+    private CompletableFuture<List<Assignments>> getOrCreateAssignments(
+            CatalogTableDescriptor tableDescriptor,
+            CatalogZoneDescriptor zoneDescriptor,
+            long causalityToken,
+            int catalogVersion
+    ) {
+        int tableId = tableDescriptor.id();
+        CompletableFuture<List<Assignments>> assignmentsFuture;
+
+        if (partitionAssignmentsGetLocally(metaStorageMgr, tableId, 0, causalityToken) != null) {
+            assignmentsFuture = completedFuture(
+                    tableAssignmentsGetLocally(metaStorageMgr, tableId, zoneDescriptor.partitions(), causalityToken));
+        } else {
+            assignmentsFuture = distributionZoneManager.dataNodes(causalityToken, catalogVersion, zoneDescriptor.id())
+                    .thenApply(dataNodes -> AffinityUtils.calculateAssignments(
+                            dataNodes,
+                            zoneDescriptor.partitions(),
+                            zoneDescriptor.replicas()
+                    ).stream().map(Assignments::of).collect(toList()));
+
+            assignmentsFuture.thenAccept(assignmentsList -> LOG.info(
+                    "Assignments calculated from data nodes [table={}, tableId={}, assignments={}, revision={}]",
+                    tableDescriptor.name(),
+                    tableId,
+                    Assignments.assignmentListToString(assignmentsList),
+                    causalityToken
+            ));
+        }
+
+        return assignmentsFuture;
     }
 
     /**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 1c2c3a4..ada4b72 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -70,9 +70,12 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.LongFunction;
 import org.apache.ignite.internal.affinity.AffinityUtils;
+import org.apache.ignite.internal.affinity.Assignment;
+import org.apache.ignite.internal.affinity.Assignments;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogTestUtils;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
@@ -342,6 +345,40 @@
     }
 
     /**
+     * Testing TableManager#writeTableAssignmentsToMetastore for 2 exceptional scenarios:
+     * 1. the method was interrupted in outer future before invoke calling completion.
+     * 2. the method was interrupted in inner metastore's future when the result of invocation had gotten, but after error happens;
+     *
+     * @throws Exception if something goes wrong on mocks creation.
+     */
+    @Test
+    public void testWriteTableAssignmentsToMetastoreExceptionally() throws Exception {
+        TableViewInternal table = mockManagersAndCreateTable(DYNAMIC_TABLE_NAME, tblManagerFut);
+        int tableId = table.tableId();
+        TableManager tableManager = tblManagerFut.join();
+        List<Assignments> assignmentsList = List.of(Assignments.of(Assignment.forPeer(node.id())));
+
+        // the first case scenario
+        CompletableFuture<List<Assignments>> assignmentsFuture = new CompletableFuture<>();
+        var outerExceptionMsg = "Outer future is interrupted";
+        assignmentsFuture.completeExceptionally(new TimeoutException(outerExceptionMsg));
+        CompletableFuture<List<Assignments>> writtenAssignmentsFuture = tableManager
+                .writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
+        assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
+        assertThrowsWithCause(writtenAssignmentsFuture::get, TimeoutException.class, outerExceptionMsg);
+
+        // the second case scenario
+        assignmentsFuture = completedFuture(assignmentsList);
+        CompletableFuture<Boolean> invokeTimeoutFuture = new CompletableFuture<>();
+        var innerExceptionMsg = "Inner future is interrupted";
+        invokeTimeoutFuture.completeExceptionally(new TimeoutException(innerExceptionMsg));
+        when(msm.invoke(any(), any(List.class), any(List.class))).thenReturn(invokeTimeoutFuture);
+        writtenAssignmentsFuture = tableManager.writeTableAssignmentsToMetastore(tableId, assignmentsFuture);
+        assertTrue(writtenAssignmentsFuture.isCompletedExceptionally());
+        assertThrowsWithCause(writtenAssignmentsFuture::get, TimeoutException.class, innerExceptionMsg);
+    }
+
+    /**
      * Tests drop a table through public API.
      *
      * @throws Exception If failed.