IGNITE-22163 Fix potential NPE in IndexManager (#3699)

diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
index 6f3070a..e5ec762 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
@@ -28,6 +28,7 @@
 import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 
 import java.util.ArrayList;
 import java.util.Set;
@@ -44,6 +45,7 @@
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.network.ClusterService;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
@@ -57,6 +59,7 @@
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Component is responsible for starting and stopping the building of indexes on primary replicas.
@@ -260,7 +263,20 @@
     }
 
     private CompletableFuture<MvTableStorage> getMvTableStorageFuture(long causalityToken, TablePartitionId replicaId) {
-        return indexManager.getMvTableStorage(causalityToken, replicaId.tableId());
+        return indexManager.getMvTableStorage(causalityToken, replicaId.tableId())
+                .thenApply(mvTableStorage -> requireMvTableStorageNonNull(mvTableStorage, replicaId.tableId()));
+    }
+
+    private static MvTableStorage requireMvTableStorageNonNull(@Nullable MvTableStorage mvTableStorage, int tableId) {
+        if (mvTableStorage == null) {
+            throw new IgniteInternalException(
+                    INTERNAL_ERR,
+                    "Table storage for the specified table cannot be null [tableId = {}]",
+                    tableId
+            );
+        }
+
+        return mvTableStorage;
     }
 
     private CompletableFuture<ReplicaMeta> awaitPrimaryReplica(TablePartitionId replicaId, HybridTimestamp timestamp) {
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 3488870..5a4bfc0 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -60,6 +60,7 @@
 import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * An Ignite component that is responsible for handling index-related commands like CREATE or DROP
@@ -171,8 +172,8 @@
      * @return Future with multi-version table storage, completes with {@code null} if the table does not exist according to the passed
      *      parameters.
      */
-    CompletableFuture<MvTableStorage> getMvTableStorage(long causalityToken, int tableId) {
-        return tableManager.tableAsync(causalityToken, tableId).thenApply(table -> table.internalTable().storage());
+    CompletableFuture<@Nullable MvTableStorage> getMvTableStorage(long causalityToken, int tableId) {
+        return tableManager.tableAsync(causalityToken, tableId).thenApply(table -> table == null ? null : table.internalTable().storage());
     }
 
     private CompletableFuture<Boolean> onIndexCreate(CreateIndexEventParameters parameters) {
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index 8b01a0b..6a23070 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -33,7 +33,9 @@
 import static org.apache.ignite.internal.table.TableTestUtils.getIndexIdStrict;
 import static org.apache.ignite.internal.table.TableTestUtils.getIndexStrict;
 import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
@@ -48,6 +50,8 @@
 
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.commands.MakeIndexAvailableCommand;
 import org.apache.ignite.internal.catalog.commands.StartBuildingIndexCommand;
@@ -86,11 +90,13 @@
 
     private final ClockService clockService = new TestClockService(clock);
 
+    private IndexManager indexManager = null;
+
     @BeforeEach
     void setUp() {
         indexBuilder = mock(IndexBuilder.class);
 
-        IndexManager indexManager = mock(IndexManager.class, invocation -> {
+        indexManager = mock(IndexManager.class, invocation -> {
             MvTableStorage mvTableStorage = mock(MvTableStorage.class);
             MvPartitionStorage mvPartitionStorage = mock(MvPartitionStorage.class);
             IndexStorage indexStorage = mock(IndexStorage.class);
@@ -191,6 +197,22 @@
     }
 
     @Test
+    void testExceptionIsThrownOnIndexBuildingWhenStorageIsNull() {
+        setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME, NODE_ID, clock.now());
+
+        createIndex(INDEX_NAME);
+
+        when(indexManager.getMvTableStorage(anyLong(), anyInt())).thenReturn(nullCompletedFuture());
+
+        assertThrows(
+                ExecutionException.class,
+                () -> catalogManager.execute(StartBuildingIndexCommand.builder().indexId(indexId(INDEX_NAME)).build())
+                        .get(10_000, TimeUnit.MILLISECONDS),
+                "Table storage for the specified table cannot be null"
+        );
+    }
+
+    @Test
     void testStartBuildIndexesOnPrimaryReplicaElected() {
         createIndex(INDEX_NAME);
 
@@ -222,6 +244,20 @@
     }
 
     @Test
+    void testExceptionIsThrownOnPrimaryReplicaElectedWhenStorageIsNull() {
+        when(indexManager.getMvTableStorage(anyLong(), anyInt())).thenReturn(nullCompletedFuture());
+
+        CompletableFuture<ReplicaMeta> replicaMetaFuture = completedFuture(replicaMetaForOneSecond(NODE_NAME, NODE_ID, clock.now()));
+
+        assertThrows(
+                ExecutionException.class,
+                () -> placementDriver.setPrimaryReplicaMeta(0, replicaId(PARTITION_ID), replicaMetaFuture)
+                        .get(10_000, TimeUnit.MILLISECONDS),
+                "Table storage for the specified table cannot be null"
+        );
+    }
+
+    @Test
     void testNotStartBuildPkIndexesForNewTable() {
         setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME, NODE_ID, clock.now());