IGNITE-21300 Implement disaster recovery for secondary indexes (#3698)

diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 000e9bb..929bd79 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -76,6 +76,7 @@
     integrationTestImplementation testFixtures(project(':ignite-sql-engine'))
     integrationTestImplementation testFixtures(project(':ignite-table'))
     integrationTestImplementation testFixtures(project(':ignite-storage-api'))
+    integrationTestImplementation testFixtures(project(':ignite-catalog'))
     integrationTestImplementation libs.jetbrains.annotations
 }
 
diff --git a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
index a3160aa..460feaa 100644
--- a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
+++ b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
@@ -18,11 +18,14 @@
 package org.apache.ignite.internal.index;
 
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
 import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static org.apache.ignite.internal.table.TableTestUtils.getIndexStrict;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -33,11 +36,14 @@
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
+import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
@@ -45,10 +51,17 @@
 import org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
 import org.apache.ignite.internal.sql.engine.util.QueryChecker;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
 import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.table.Table;
 import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 
@@ -301,6 +314,30 @@
                 .check();
     }
 
+    @Test
+    void testBuildIndexAfterDisasterRecovery() throws Exception {
+        createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
+
+        insertPeople(TABLE_NAME, new Person(0, "0", 10.0));
+
+        createIndexForSalaryFieldAndWaitBecomeAvailable();
+
+        // Not a fair reproduction of disaster recovery, but simple.
+        int partitionId = 0;
+        setNextRowIdToBuild(INDEX_NAME, partitionId, RowId.lowestRowId(partitionId));
+
+        CLUSTER.stopNode(0);
+        CLUSTER.startNode(0);
+
+        assertTrue(waitForCondition(() -> indexStorage(INDEX_NAME, partitionId).getNextRowIdToBuild() == null, SECONDS.toMillis(5)));
+
+        // Now let's check the data itself.
+        assertQuery(format("SELECT * FROM {} WHERE salary > 0.0", TABLE_NAME))
+                .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, INDEX_NAME))
+                .returns(0, "0", 10.0)
+                .check();
+    }
+
     private static IgniteImpl node() {
         return CLUSTER.node(0);
     }
@@ -388,4 +425,72 @@
 
         return future;
     }
+
+    private static void setNextRowIdToBuild(String indexName, int partitionId, @Nullable RowId nextRowIdToBuild) {
+        CatalogIndexDescriptor indexDescriptor = indexDescriptor(indexName);
+
+        MvPartitionStorage mvPartitionStorage = mvPartitionStorage(indexDescriptor, partitionId);
+        IndexStorage indexStorage = indexStorage(indexDescriptor, partitionId);
+
+        CompletableFuture<Void> flushFuture = runAsync(() -> {
+            mvPartitionStorage.runConsistently(locker -> {
+                indexStorage.setNextRowIdToBuild(nextRowIdToBuild);
+
+                return null;
+            });
+        }, "test-flusher");
+
+        assertThat(flushFuture, willCompleteSuccessfully());
+    }
+
+    private static TableViewInternal tableViewInternal(int tableId) {
+        CompletableFuture<List<Table>> tablesFuture = node().tables().tablesAsync();
+
+        assertThat(tablesFuture, willCompleteSuccessfully());
+
+        TableViewInternal tableViewInternal = tablesFuture.join().stream()
+                .map(TestWrappers::unwrapTableViewInternal)
+                .filter(table -> table.tableId() == tableId)
+                .findFirst()
+                .orElse(null);
+
+        assertNotNull(tableViewInternal, "tableId=" + tableId);
+
+        return tableViewInternal;
+    }
+
+    private static CatalogIndexDescriptor indexDescriptor(String indexName) {
+        IgniteImpl node = node();
+
+        return getIndexStrict(node.catalogManager(), indexName, node.clock().nowLong());
+    }
+
+    private static IndexStorage indexStorage(String indexName, int partitionId) {
+        return indexStorage(indexDescriptor(indexName), partitionId);
+    }
+
+    private static IndexStorage indexStorage(CatalogIndexDescriptor indexDescriptor, int partitionId) {
+        TableViewInternal tableViewInternal = tableViewInternal(indexDescriptor.tableId());
+
+        int indexId = indexDescriptor.id();
+
+        IndexStorage indexStorage = tableViewInternal.internalTable().storage().getIndex(partitionId, indexId);
+
+        assertNotNull(indexStorage, String.format("indexId=%s, partitionId=%s", indexId, partitionId));
+
+        return indexStorage;
+    }
+
+    private static MvPartitionStorage mvPartitionStorage(CatalogIndexDescriptor indexDescriptor, int partitionId) {
+        int tableId = indexDescriptor.tableId();
+
+        TableViewInternal tableViewInternal = tableViewInternal(tableId);
+
+        MvTableStorage mvTableStorage = tableViewInternal.internalTable().storage();
+
+        MvPartitionStorage mvPartitionStorage = mvTableStorage.getMvPartition(partitionId);
+        assertNotNull(mvPartitionStorage, String.format("tableId=%s, partitionId=%s", tableId, partitionId));
+
+        return mvPartitionStorage;
+    }
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
index 7452a2b..5989c9d 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
@@ -212,7 +212,12 @@
             }
         });
 
-        indexBuilder.listen((indexId, tableId, partitionId) -> onIndexBuildCompletionForPartition(indexId, partitionId));
+        indexBuilder.listen(new IndexBuildCompletionListener() {
+            @Override
+            public void onBuildCompletion(int indexId, int tableId, int partitionId) {
+                onIndexBuildCompletionForPartition(indexId, partitionId);
+            }
+        });
     }
 
     private CompletableFuture<?> onIndexBuilding(StartBuildingIndexEventParameters parameters) {
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildCompletionListener.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildCompletionListener.java
index 02e9d9c..6d42892 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildCompletionListener.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildCompletionListener.java
@@ -17,9 +17,17 @@
 
 package org.apache.ignite.internal.index;
 
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+
 /** Index build completion listener, will be called when a distributed build of an index for a specific partition completes. */
-@FunctionalInterface
 interface IndexBuildCompletionListener {
     /** Handles the index build completion event. */
-    void onBuildCompletion(int indexId, int tableId, int partitionId);
+    default void onBuildCompletion(int indexId, int tableId, int partitionId) {
+        // No-op.
+    }
+
+    /** Handles an {@link CatalogIndexStatus#AVAILABLE available} index build completion event after disaster recovery. */
+    default void onBuildCompletionAfterDisasterRecovery(int indexId, int tableId, int partitionId) {
+        // No-op.
+    }
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
index d8b15d0..6f3070a 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
@@ -20,6 +20,7 @@
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
 import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
 import static org.apache.ignite.internal.index.IndexManagementUtils.AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC;
 import static org.apache.ignite.internal.index.IndexManagementUtils.isLocalNode;
@@ -36,6 +37,7 @@
 import java.util.function.Function;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
 import org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
@@ -213,6 +215,13 @@
             for (CatalogIndexDescriptor indexDescriptor : catalogService.indexes(catalogVersion, primaryReplicaId.tableId())) {
                 if (indexDescriptor.status() == BUILDING) {
                     scheduleBuildIndex(primaryReplicaId, indexDescriptor, mvTableStorage, enlistmentConsistencyToken(replicaMeta));
+                } else if (indexDescriptor.status() == AVAILABLE) {
+                    scheduleBuildIndexAfterDisasterRecovery(
+                            primaryReplicaId,
+                            indexDescriptor,
+                            mvTableStorage,
+                            enlistmentConsistencyToken(replicaMeta)
+                    );
                 }
             }
         });
@@ -279,22 +288,37 @@
             MvTableStorage mvTableStorage,
             long enlistmentConsistencyToken
     ) {
-        int partitionId = replicaId.partitionId();
+        MvPartitionStorage mvPartition = mvPartitionStorage(mvTableStorage, replicaId);
 
-        MvPartitionStorage mvPartition = mvTableStorage.getMvPartition(partitionId);
-
-        assert mvPartition != null : replicaId;
-
-        int indexId = indexDescriptor.id();
-
-        IndexStorage indexStorage = mvTableStorage.getIndex(partitionId, indexId);
-
-        assert indexStorage != null : "replicaId=" + replicaId + ", indexId=" + indexId;
+        IndexStorage indexStorage = indexStorage(mvTableStorage, replicaId, indexDescriptor);
 
         indexBuilder.scheduleBuildIndex(
                 replicaId.tableId(),
-                partitionId,
-                indexId,
+                replicaId.partitionId(),
+                indexDescriptor.id(),
+                indexStorage,
+                mvPartition,
+                localNode(),
+                enlistmentConsistencyToken,
+                indexDescriptor.txWaitCatalogVersion()
+        );
+    }
+
+    /** Shortcut to schedule {@link CatalogIndexStatus#AVAILABLE available} index building after disaster recovery. */
+    private void scheduleBuildIndexAfterDisasterRecovery(
+            TablePartitionId replicaId,
+            CatalogIndexDescriptor indexDescriptor,
+            MvTableStorage mvTableStorage,
+            long enlistmentConsistencyToken
+    ) {
+        MvPartitionStorage mvPartition = mvPartitionStorage(mvTableStorage, replicaId);
+
+        IndexStorage indexStorage = indexStorage(mvTableStorage, replicaId, indexDescriptor);
+
+        indexBuilder.scheduleBuildIndexAfterDisasterRecovery(
+                replicaId.tableId(),
+                replicaId.partitionId(),
+                indexDescriptor.id(),
                 indexStorage,
                 mvPartition,
                 localNode(),
@@ -314,4 +338,24 @@
     private static long enlistmentConsistencyToken(ReplicaMeta replicaMeta) {
         return replicaMeta.getStartTime().longValue();
     }
+
+    private static MvPartitionStorage mvPartitionStorage(MvTableStorage mvTableStorage, TablePartitionId replicaId) {
+        MvPartitionStorage mvPartition = mvTableStorage.getMvPartition(replicaId.partitionId());
+
+        assert mvPartition != null : replicaId;
+
+        return mvPartition;
+    }
+
+    private static IndexStorage indexStorage(
+            MvTableStorage mvTableStorage,
+            TablePartitionId replicaId,
+            CatalogIndexDescriptor indexDescriptor
+    ) {
+        IndexStorage indexStorage = mvTableStorage.getIndex(replicaId.partitionId(), indexDescriptor.id());
+
+        assert indexStorage != null : "replicaId=" + replicaId + ", indexId=" + indexDescriptor.id();
+
+        return indexStorage;
+    }
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index bd4c523..00ea480 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -73,6 +73,8 @@
 
     private final int creationCatalogVersion;
 
+    private final boolean afterDisasterRecovery;
+
     private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock();
 
     private final AtomicBoolean taskStopGuard = new AtomicBoolean();
@@ -90,7 +92,8 @@
             ClusterNode node,
             List<IndexBuildCompletionListener> listeners,
             long enlistmentConsistencyToken,
-            int creationCatalogVersion
+            int creationCatalogVersion,
+            boolean afterDisasterRecovery
     ) {
         this.taskId = taskId;
         this.indexStorage = indexStorage;
@@ -104,6 +107,7 @@
         this.listeners = listeners;
         this.enlistmentConsistencyToken = enlistmentConsistencyToken;
         this.creationCatalogVersion = creationCatalogVersion;
+        this.afterDisasterRecovery = afterDisasterRecovery;
     }
 
     /** Starts building the index. */
@@ -176,9 +180,7 @@
                             // Index has been built.
                             LOG.info("Index build completed: [{}]", createCommonIndexInfo());
 
-                            for (IndexBuildCompletionListener listener : listeners) {
-                                listener.onBuildCompletion(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId());
-                            }
+                            notifyListeners(taskId);
 
                             return CompletableFutures.<Void>nullCompletedFuture();
                         }
@@ -240,4 +242,14 @@
                 taskId.getTableId(), taskId.getPartitionId(), taskId.getIndexId()
         );
     }
+
+    private void notifyListeners(IndexBuildTaskId taskId) {
+        for (IndexBuildCompletionListener listener : listeners) {
+            if (afterDisasterRecovery) {
+                listener.onBuildCompletionAfterDisasterRecovery(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId());
+            } else {
+                listener.onBuildCompletion(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId());
+            }
+        }
+    }
 }
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
index c9e8d0c..020f5d0 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -85,7 +86,8 @@
      *     <li>Index is built in batches using {@link BuildIndexReplicaRequest}, which are then transformed into {@link BuildIndexCommand}
      *     on the replica, batches are sent sequentially.</li>
      *     <li>It is expected that the index building is triggered by the primary replica.</li>
-     *     <li>If the index has already been built, {@link IndexBuildCompletionListener} will be notified.</li>
+     *     <li>If the index has already been built or after the building is complete, {@link IndexBuildCompletionListener#onBuildCompletion}
+     *     will be notified.</li>
      * </ul>
      *
      * @param tableId Table ID.
@@ -130,19 +132,71 @@
                     node,
                     listeners,
                     enlistmentConsistencyToken,
-                    creationCatalogVersion
+                    creationCatalogVersion,
+                    false
             );
 
-            IndexBuildTask previousTask = indexBuildTaskById.putIfAbsent(taskId, newTask);
+            putAndStartTaskIfAbsent(taskId, newTask);
+        });
+    }
 
-            if (previousTask != null) {
-                // Index building is already in progress.
+    /**
+     * Schedules building an {@link CatalogIndexStatus#AVAILABLE available} index after disaster recovery, if it is not already built or is
+     * not yet in progress.
+     *
+     * <p>Notes:</p>
+     * <ul>
+     *     <li>Index is built in batches using {@link BuildIndexReplicaRequest}, which are then transformed into {@link BuildIndexCommand}
+     *     on the replica, batches are sent sequentially.</li>
+     *     <li>It is expected that the index building is triggered by the primary replica.</li>
+     *     <li>If the index has already been built, then nothing will happen.</li>
+     *     <li>After index building is complete, {@link IndexBuildCompletionListener#onBuildCompletionAfterDisasterRecovery} will be
+     *     notified.</li>
+     * </ul>
+     *
+     * @param tableId Table ID.
+     * @param partitionId Partition ID.
+     * @param indexId Index ID.
+     * @param indexStorage Index storage to build.
+     * @param partitionStorage Multi-versioned partition storage.
+     * @param node Node to which requests to build the index will be sent.
+     * @param enlistmentConsistencyToken Enlistment consistency token is used to check that the lease is still actual while the
+     *         message goes to the replica.
+     * @param creationCatalogVersion Catalog version in which the index was created.
+     */
+    public void scheduleBuildIndexAfterDisasterRecovery(
+            int tableId,
+            int partitionId,
+            int indexId,
+            IndexStorage indexStorage,
+            MvPartitionStorage partitionStorage,
+            ClusterNode node,
+            long enlistmentConsistencyToken,
+            int creationCatalogVersion
+    ) {
+        inBusyLockSafe(busyLock, () -> {
+            if (indexStorage.getNextRowIdToBuild() == null) {
                 return;
             }
 
-            newTask.start();
+            IndexBuildTaskId taskId = new IndexBuildTaskId(tableId, partitionId, indexId);
 
-            newTask.getTaskFuture().whenComplete((unused, throwable) -> indexBuildTaskById.remove(taskId));
+            IndexBuildTask newTask = new IndexBuildTask(
+                    taskId,
+                    indexStorage,
+                    partitionStorage,
+                    replicaService,
+                    executor,
+                    busyLock,
+                    BATCH_SIZE,
+                    node,
+                    listeners,
+                    enlistmentConsistencyToken,
+                    creationCatalogVersion,
+                    true
+            );
+
+            putAndStartTaskIfAbsent(taskId, newTask);
         });
     }
 
@@ -214,4 +268,19 @@
     public void stopListen(IndexBuildCompletionListener listener) {
         listeners.remove(listener);
     }
+
+    private void putAndStartTaskIfAbsent(IndexBuildTaskId taskId, IndexBuildTask task) {
+        IndexBuildTask previousTask = indexBuildTaskById.putIfAbsent(taskId, task);
+
+        if (previousTask != null) {
+            // Index building is already in progress.
+            return;
+        }
+
+        try {
+            task.start();
+        } finally {
+            task.getTaskFuture().whenComplete((unused, throwable) -> indexBuildTaskById.remove(taskId));
+        }
+    }
 }
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index 7684ca7..d756c93 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -389,9 +389,12 @@
 
         CompletableFuture<Void> finishBuildIndexFuture = new CompletableFuture<>();
 
-        indexBuilder.listen((indexId1, tableId, partitionId1) -> {
-            if (indexId1 == indexId && partitionId1 == partitionId) {
-                finishBuildIndexFuture.complete(null);
+        indexBuilder.listen(new IndexBuildCompletionListener() {
+            @Override
+            public void onBuildCompletion(int indexId1, int tableId1, int partitionId1) {
+                if (indexId1 == indexId && partitionId1 == partitionId) {
+                    finishBuildIndexFuture.complete(null);
+                }
             }
         });
 
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index 326dae8..8b01a0b 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -26,6 +26,7 @@
 import static org.apache.ignite.internal.index.TestIndexManagementUtils.LOCAL_NODE;
 import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_ID;
 import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
+import static org.apache.ignite.internal.index.TestIndexManagementUtils.PK_INDEX_NAME;
 import static org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME;
 import static org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
 import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
@@ -143,6 +144,17 @@
                 anyLong(),
                 eq(indexCreationCatalogVersion(INDEX_NAME))
         );
+
+        verify(indexBuilder, never()).scheduleBuildIndexAfterDisasterRecovery(
+                eq(tableId()),
+                eq(PARTITION_ID),
+                eq(indexId(INDEX_NAME)),
+                any(),
+                any(),
+                eq(LOCAL_NODE),
+                anyLong(),
+                eq(indexCreationCatalogVersion(INDEX_NAME))
+        );
     }
 
     @Test
@@ -165,6 +177,17 @@
                 anyLong(),
                 eq(indexCreationCatalogVersion(INDEX_NAME))
         );
+
+        verify(indexBuilder, never()).scheduleBuildIndexAfterDisasterRecovery(
+                eq(tableId()),
+                eq(PARTITION_ID),
+                eq(indexId(INDEX_NAME)),
+                any(),
+                any(),
+                eq(LOCAL_NODE),
+                anyLong(),
+                eq(indexCreationCatalogVersion(INDEX_NAME))
+        );
     }
 
     @Test
@@ -185,21 +208,16 @@
                 anyLong(),
                 eq(indexCreationCatalogVersion(INDEX_NAME))
         );
-    }
 
-    @Test
-    void testNotStartBuildPkIndexesOnPrimaryReplicaElected() {
-        setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME, NODE_ID, clock.now());
-
-        verify(indexBuilder, never()).scheduleBuildIndex(
+        verify(indexBuilder).scheduleBuildIndexAfterDisasterRecovery(
                 eq(tableId()),
                 eq(PARTITION_ID),
-                eq(indexId(pkIndexName(TABLE_NAME))),
+                eq(indexId(PK_INDEX_NAME)),
                 any(),
                 any(),
                 eq(LOCAL_NODE),
                 anyLong(),
-                eq(indexCreationCatalogVersion(pkIndexName(TABLE_NAME)))
+                eq(indexCreationCatalogVersion(PK_INDEX_NAME))
         );
     }
 
@@ -212,14 +230,25 @@
         createTable(catalogManager, tableName, COLUMN_NAME);
 
         verify(indexBuilder, never()).scheduleBuildIndex(
-                eq(tableId()),
+                eq(tableId(tableName)),
                 eq(PARTITION_ID),
                 eq(indexId(pkIndexName(tableName))),
                 any(),
                 any(),
                 eq(LOCAL_NODE),
                 anyLong(),
-                eq(indexCreationCatalogVersion(pkIndexName(tableName)))
+                anyInt()
+        );
+
+        verify(indexBuilder, never()).scheduleBuildIndexAfterDisasterRecovery(
+                eq(tableId(tableName)),
+                eq(PARTITION_ID),
+                eq(indexId(pkIndexName(tableName))),
+                any(),
+                any(),
+                eq(LOCAL_NODE),
+                anyLong(),
+                anyInt()
         );
     }
 
@@ -264,6 +293,17 @@
                 anyLong(),
                 anyInt()
         );
+
+        verify(indexBuilder).scheduleBuildIndexAfterDisasterRecovery(
+                eq(tableId()),
+                eq(PARTITION_ID),
+                eq(indexId0),
+                any(),
+                any(),
+                eq(LOCAL_NODE),
+                anyLong(),
+                anyInt()
+        );
     }
 
     private void createIndex(String indexName) {
@@ -294,7 +334,11 @@
     }
 
     private int tableId() {
-        return getTableIdStrict(catalogManager, TABLE_NAME, clock.nowLong());
+        return tableId(TABLE_NAME);
+    }
+
+    private int tableId(String tableName) {
+        return getTableIdStrict(catalogManager, tableName, clock.nowLong());
     }
 
     private int indexId(String indexName) {
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
index a499eee..56b8f77 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
@@ -27,6 +27,7 @@
 import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -92,7 +93,12 @@
     void testStopListenIndexBuildCompletion() {
         CompletableFuture<Void> invokeListenerFuture = new CompletableFuture<>();
 
-        IndexBuildCompletionListener listener = (indexId, tableId, partitionId) -> invokeListenerFuture.complete(null);
+        IndexBuildCompletionListener listener = new IndexBuildCompletionListener() {
+            @Override
+            public void onBuildCompletion(int indexId, int tableId, int partitionId) {
+                invokeListenerFuture.complete(null);
+            }
+        };
 
         indexBuilder.listen(listener);
         indexBuilder.stopListen(listener);
@@ -149,6 +155,16 @@
         verify(replicaService, times(2)).invoke(any(ClusterNode.class), any(BuildIndexReplicaRequest.class));
     }
 
+    @Test
+    void testScheduleBuildIndexAfterDisasterRecovery() {
+        CompletableFuture<Void> listenCompletionIndexBuildingAfterDisasterRecoveryFuture =
+                listenCompletionIndexBuildingAfterDisasterRecovery(INDEX_ID, TABLE_ID, PARTITION_ID);
+
+        scheduleBuildIndexAfterDisasterRecovery(INDEX_ID, TABLE_ID, PARTITION_ID, List.of(rowId(PARTITION_ID)));
+
+        assertThat(listenCompletionIndexBuildingAfterDisasterRecoveryFuture, willCompleteSuccessfully());
+    }
+
     private void scheduleBuildIndex(int indexId, int tableId, int partitionId, Collection<RowId> nextRowIdsToBuild) {
         indexBuilder.scheduleBuildIndex(
                 tableId,
@@ -162,12 +178,53 @@
         );
     }
 
-    private CompletableFuture<Void> listenCompletionIndexBuilding(int indexId, int tableId, int partitionId) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
+    private void scheduleBuildIndexAfterDisasterRecovery(int indexId, int tableId, int partitionId, Collection<RowId> nextRowIdsToBuild) {
+        indexBuilder.scheduleBuildIndexAfterDisasterRecovery(
+                tableId,
+                partitionId,
+                indexId,
+                indexStorage(nextRowIdsToBuild),
+                mock(MvPartitionStorage.class),
+                mock(ClusterNode.class),
+                ANY_ENLISTMENT_CONSISTENCY_TOKEN,
+                ANY_INDEX_CREATION_CATALOG_VERSION
+        );
+    }
 
-        indexBuilder.listen((indexId1, tableId1, partitionId1) -> {
-            if (indexId1 == indexId && tableId1 == tableId && partitionId1 == partitionId) {
-                future.complete(null);
+    private CompletableFuture<Void> listenCompletionIndexBuilding(int indexId, int tableId, int partitionId) {
+        var future = new CompletableFuture<Void>();
+
+        indexBuilder.listen(new IndexBuildCompletionListener() {
+            @Override
+            public void onBuildCompletion(int indexId1, int tableId1, int partitionId1) {
+                if (indexId1 == indexId && tableId1 == tableId && partitionId1 == partitionId) {
+                    future.complete(null);
+                }
+            }
+
+            @Override
+            public void onBuildCompletionAfterDisasterRecovery(int indexId, int tableId, int partitionId) {
+                fail(String.format("indexId=%s, tableId=%s, partitionId=%s", indexId, tableId, partitionId));
+            }
+        });
+
+        return future;
+    }
+
+    private CompletableFuture<Void> listenCompletionIndexBuildingAfterDisasterRecovery(int indexId, int tableId, int partitionId) {
+        var future = new CompletableFuture<Void>();
+
+        indexBuilder.listen(new IndexBuildCompletionListener() {
+            @Override
+            public void onBuildCompletionAfterDisasterRecovery(int indexId1, int tableId1, int partitionId1) {
+                if (indexId1 == indexId && tableId1 == tableId && partitionId1 == partitionId) {
+                    future.complete(null);
+                }
+            }
+
+            @Override
+            public void onBuildCompletion(int indexId, int tableId, int partitionId) {
+                fail(String.format("indexId=%s, tableId=%s, partitionId=%s", indexId, tableId, partitionId));
             }
         });