IGNITE-21300 Implement disaster recovery for secondary indexes (#3698)
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 000e9bb..929bd79 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -76,6 +76,7 @@
integrationTestImplementation testFixtures(project(':ignite-sql-engine'))
integrationTestImplementation testFixtures(project(':ignite-table'))
integrationTestImplementation testFixtures(project(':ignite-storage-api'))
+ integrationTestImplementation testFixtures(project(':ignite-catalog'))
integrationTestImplementation libs.jetbrains.annotations
}
diff --git a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
index a3160aa..460feaa 100644
--- a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
+++ b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexOneNodeTest.java
@@ -18,11 +18,14 @@
package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static org.apache.ignite.internal.table.TableTestUtils.getIndexStrict;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -33,11 +36,14 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
+import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
@@ -45,10 +51,17 @@
import org.apache.ignite.internal.catalog.events.MakeIndexAvailableEventParameters;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.util.QueryChecker;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.replication.request.BuildIndexReplicaRequest;
import org.apache.ignite.sql.SqlException;
+import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -301,6 +314,30 @@
.check();
}
+ @Test
+ void testBuildIndexAfterDisasterRecovery() throws Exception {
+ createZoneAndTable(ZONE_NAME, TABLE_NAME, 1, 1);
+
+ insertPeople(TABLE_NAME, new Person(0, "0", 10.0));
+
+ createIndexForSalaryFieldAndWaitBecomeAvailable();
+
+ // Not a fair reproduction of disaster recovery, but simple.
+ int partitionId = 0;
+ setNextRowIdToBuild(INDEX_NAME, partitionId, RowId.lowestRowId(partitionId));
+
+ CLUSTER.stopNode(0);
+ CLUSTER.startNode(0);
+
+ assertTrue(waitForCondition(() -> indexStorage(INDEX_NAME, partitionId).getNextRowIdToBuild() == null, SECONDS.toMillis(5)));
+
+ // Now let's check the data itself.
+ assertQuery(format("SELECT * FROM {} WHERE salary > 0.0", TABLE_NAME))
+ .matches(containsIndexScan(DEFAULT_SCHEMA_NAME, TABLE_NAME, INDEX_NAME))
+ .returns(0, "0", 10.0)
+ .check();
+ }
+
private static IgniteImpl node() {
return CLUSTER.node(0);
}
@@ -388,4 +425,72 @@
return future;
}
+
+ private static void setNextRowIdToBuild(String indexName, int partitionId, @Nullable RowId nextRowIdToBuild) {
+ CatalogIndexDescriptor indexDescriptor = indexDescriptor(indexName);
+
+ MvPartitionStorage mvPartitionStorage = mvPartitionStorage(indexDescriptor, partitionId);
+ IndexStorage indexStorage = indexStorage(indexDescriptor, partitionId);
+
+ CompletableFuture<Void> flushFuture = runAsync(() -> {
+ mvPartitionStorage.runConsistently(locker -> {
+ indexStorage.setNextRowIdToBuild(nextRowIdToBuild);
+
+ return null;
+ });
+ }, "test-flusher");
+
+ assertThat(flushFuture, willCompleteSuccessfully());
+ }
+
+ private static TableViewInternal tableViewInternal(int tableId) {
+ CompletableFuture<List<Table>> tablesFuture = node().tables().tablesAsync();
+
+ assertThat(tablesFuture, willCompleteSuccessfully());
+
+ TableViewInternal tableViewInternal = tablesFuture.join().stream()
+ .map(TestWrappers::unwrapTableViewInternal)
+ .filter(table -> table.tableId() == tableId)
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(tableViewInternal, "tableId=" + tableId);
+
+ return tableViewInternal;
+ }
+
+ private static CatalogIndexDescriptor indexDescriptor(String indexName) {
+ IgniteImpl node = node();
+
+ return getIndexStrict(node.catalogManager(), indexName, node.clock().nowLong());
+ }
+
+ private static IndexStorage indexStorage(String indexName, int partitionId) {
+ return indexStorage(indexDescriptor(indexName), partitionId);
+ }
+
+ private static IndexStorage indexStorage(CatalogIndexDescriptor indexDescriptor, int partitionId) {
+ TableViewInternal tableViewInternal = tableViewInternal(indexDescriptor.tableId());
+
+ int indexId = indexDescriptor.id();
+
+ IndexStorage indexStorage = tableViewInternal.internalTable().storage().getIndex(partitionId, indexId);
+
+ assertNotNull(indexStorage, String.format("indexId=%s, partitionId=%s", indexId, partitionId));
+
+ return indexStorage;
+ }
+
+ private static MvPartitionStorage mvPartitionStorage(CatalogIndexDescriptor indexDescriptor, int partitionId) {
+ int tableId = indexDescriptor.tableId();
+
+ TableViewInternal tableViewInternal = tableViewInternal(tableId);
+
+ MvTableStorage mvTableStorage = tableViewInternal.internalTable().storage();
+
+ MvPartitionStorage mvPartitionStorage = mvTableStorage.getMvPartition(partitionId);
+ assertNotNull(mvPartitionStorage, String.format("tableId=%s, partitionId=%s", tableId, partitionId));
+
+ return mvPartitionStorage;
+ }
}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
index 7452a2b..5989c9d 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexAvailabilityController.java
@@ -212,7 +212,12 @@
}
});
- indexBuilder.listen((indexId, tableId, partitionId) -> onIndexBuildCompletionForPartition(indexId, partitionId));
+ indexBuilder.listen(new IndexBuildCompletionListener() {
+ @Override
+ public void onBuildCompletion(int indexId, int tableId, int partitionId) {
+ onIndexBuildCompletionForPartition(indexId, partitionId);
+ }
+ });
}
private CompletableFuture<?> onIndexBuilding(StartBuildingIndexEventParameters parameters) {
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildCompletionListener.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildCompletionListener.java
index 02e9d9c..6d42892 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildCompletionListener.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildCompletionListener.java
@@ -17,9 +17,17 @@
package org.apache.ignite.internal.index;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+
/** Index build completion listener, will be called when a distributed build of an index for a specific partition completes. */
-@FunctionalInterface
interface IndexBuildCompletionListener {
/** Handles the index build completion event. */
- void onBuildCompletion(int indexId, int tableId, int partitionId);
+ default void onBuildCompletion(int indexId, int tableId, int partitionId) {
+ // No-op.
+ }
+
+ /** Handles an {@link CatalogIndexStatus#AVAILABLE available} index build completion event after disaster recovery. */
+ default void onBuildCompletionAfterDisasterRecovery(int indexId, int tableId, int partitionId) {
+ // No-op.
+ }
}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
index d8b15d0..6f3070a 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java
@@ -20,6 +20,7 @@
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.BUILDING;
import static org.apache.ignite.internal.index.IndexManagementUtils.AWAIT_PRIMARY_REPLICA_TIMEOUT_SEC;
import static org.apache.ignite.internal.index.IndexManagementUtils.isLocalNode;
@@ -36,6 +37,7 @@
import java.util.function.Function;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
import org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
@@ -213,6 +215,13 @@
for (CatalogIndexDescriptor indexDescriptor : catalogService.indexes(catalogVersion, primaryReplicaId.tableId())) {
if (indexDescriptor.status() == BUILDING) {
scheduleBuildIndex(primaryReplicaId, indexDescriptor, mvTableStorage, enlistmentConsistencyToken(replicaMeta));
+ } else if (indexDescriptor.status() == AVAILABLE) {
+ scheduleBuildIndexAfterDisasterRecovery(
+ primaryReplicaId,
+ indexDescriptor,
+ mvTableStorage,
+ enlistmentConsistencyToken(replicaMeta)
+ );
}
}
});
@@ -279,22 +288,37 @@
MvTableStorage mvTableStorage,
long enlistmentConsistencyToken
) {
- int partitionId = replicaId.partitionId();
+ MvPartitionStorage mvPartition = mvPartitionStorage(mvTableStorage, replicaId);
- MvPartitionStorage mvPartition = mvTableStorage.getMvPartition(partitionId);
-
- assert mvPartition != null : replicaId;
-
- int indexId = indexDescriptor.id();
-
- IndexStorage indexStorage = mvTableStorage.getIndex(partitionId, indexId);
-
- assert indexStorage != null : "replicaId=" + replicaId + ", indexId=" + indexId;
+ IndexStorage indexStorage = indexStorage(mvTableStorage, replicaId, indexDescriptor);
indexBuilder.scheduleBuildIndex(
replicaId.tableId(),
- partitionId,
- indexId,
+ replicaId.partitionId(),
+ indexDescriptor.id(),
+ indexStorage,
+ mvPartition,
+ localNode(),
+ enlistmentConsistencyToken,
+ indexDescriptor.txWaitCatalogVersion()
+ );
+ }
+
+ /** Shortcut to schedule {@link CatalogIndexStatus#AVAILABLE available} index building after disaster recovery. */
+ private void scheduleBuildIndexAfterDisasterRecovery(
+ TablePartitionId replicaId,
+ CatalogIndexDescriptor indexDescriptor,
+ MvTableStorage mvTableStorage,
+ long enlistmentConsistencyToken
+ ) {
+ MvPartitionStorage mvPartition = mvPartitionStorage(mvTableStorage, replicaId);
+
+ IndexStorage indexStorage = indexStorage(mvTableStorage, replicaId, indexDescriptor);
+
+ indexBuilder.scheduleBuildIndexAfterDisasterRecovery(
+ replicaId.tableId(),
+ replicaId.partitionId(),
+ indexDescriptor.id(),
indexStorage,
mvPartition,
localNode(),
@@ -314,4 +338,24 @@
private static long enlistmentConsistencyToken(ReplicaMeta replicaMeta) {
return replicaMeta.getStartTime().longValue();
}
+
+ private static MvPartitionStorage mvPartitionStorage(MvTableStorage mvTableStorage, TablePartitionId replicaId) {
+ MvPartitionStorage mvPartition = mvTableStorage.getMvPartition(replicaId.partitionId());
+
+ assert mvPartition != null : replicaId;
+
+ return mvPartition;
+ }
+
+ private static IndexStorage indexStorage(
+ MvTableStorage mvTableStorage,
+ TablePartitionId replicaId,
+ CatalogIndexDescriptor indexDescriptor
+ ) {
+ IndexStorage indexStorage = mvTableStorage.getIndex(replicaId.partitionId(), indexDescriptor.id());
+
+ assert indexStorage != null : "replicaId=" + replicaId + ", indexId=" + indexDescriptor.id();
+
+ return indexStorage;
+ }
}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
index bd4c523..00ea480 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildTask.java
@@ -73,6 +73,8 @@
private final int creationCatalogVersion;
+ private final boolean afterDisasterRecovery;
+
private final IgniteSpinBusyLock taskBusyLock = new IgniteSpinBusyLock();
private final AtomicBoolean taskStopGuard = new AtomicBoolean();
@@ -90,7 +92,8 @@
ClusterNode node,
List<IndexBuildCompletionListener> listeners,
long enlistmentConsistencyToken,
- int creationCatalogVersion
+ int creationCatalogVersion,
+ boolean afterDisasterRecovery
) {
this.taskId = taskId;
this.indexStorage = indexStorage;
@@ -104,6 +107,7 @@
this.listeners = listeners;
this.enlistmentConsistencyToken = enlistmentConsistencyToken;
this.creationCatalogVersion = creationCatalogVersion;
+ this.afterDisasterRecovery = afterDisasterRecovery;
}
/** Starts building the index. */
@@ -176,9 +180,7 @@
// Index has been built.
LOG.info("Index build completed: [{}]", createCommonIndexInfo());
- for (IndexBuildCompletionListener listener : listeners) {
- listener.onBuildCompletion(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId());
- }
+ notifyListeners(taskId);
return CompletableFutures.<Void>nullCompletedFuture();
}
@@ -240,4 +242,14 @@
taskId.getTableId(), taskId.getPartitionId(), taskId.getIndexId()
);
}
+
+ private void notifyListeners(IndexBuildTaskId taskId) {
+ for (IndexBuildCompletionListener listener : listeners) {
+ if (afterDisasterRecovery) {
+ listener.onBuildCompletionAfterDisasterRecovery(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId());
+ } else {
+ listener.onBuildCompletion(taskId.getIndexId(), taskId.getTableId(), taskId.getPartitionId());
+ }
+ }
+ }
}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
index c9e8d0c..020f5d0 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuilder.java
@@ -28,6 +28,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -85,7 +86,8 @@
* <li>Index is built in batches using {@link BuildIndexReplicaRequest}, which are then transformed into {@link BuildIndexCommand}
* on the replica, batches are sent sequentially.</li>
* <li>It is expected that the index building is triggered by the primary replica.</li>
- * <li>If the index has already been built, {@link IndexBuildCompletionListener} will be notified.</li>
+ * <li>If the index has already been built or after the building is complete, {@link IndexBuildCompletionListener#onBuildCompletion}
+ * will be notified.</li>
* </ul>
*
* @param tableId Table ID.
@@ -130,19 +132,71 @@
node,
listeners,
enlistmentConsistencyToken,
- creationCatalogVersion
+ creationCatalogVersion,
+ false
);
- IndexBuildTask previousTask = indexBuildTaskById.putIfAbsent(taskId, newTask);
+ putAndStartTaskIfAbsent(taskId, newTask);
+ });
+ }
- if (previousTask != null) {
- // Index building is already in progress.
+ /**
+ * Schedules building an {@link CatalogIndexStatus#AVAILABLE available} index after disaster recovery, if it is not already built or is
+ * not yet in progress.
+ *
+ * <p>Notes:</p>
+ * <ul>
+ * <li>Index is built in batches using {@link BuildIndexReplicaRequest}, which are then transformed into {@link BuildIndexCommand}
+ * on the replica, batches are sent sequentially.</li>
+ * <li>It is expected that the index building is triggered by the primary replica.</li>
+ * <li>If the index has already been built, then nothing will happen.</li>
+ * <li>After index building is complete, {@link IndexBuildCompletionListener#onBuildCompletionAfterDisasterRecovery} will be
+ * notified.</li>
+ * </ul>
+ *
+ * @param tableId Table ID.
+ * @param partitionId Partition ID.
+ * @param indexId Index ID.
+ * @param indexStorage Index storage to build.
+ * @param partitionStorage Multi-versioned partition storage.
+ * @param node Node to which requests to build the index will be sent.
+ * @param enlistmentConsistencyToken Enlistment consistency token is used to check that the lease is still actual while the
+ * message goes to the replica.
+ * @param creationCatalogVersion Catalog version in which the index was created.
+ */
+ public void scheduleBuildIndexAfterDisasterRecovery(
+ int tableId,
+ int partitionId,
+ int indexId,
+ IndexStorage indexStorage,
+ MvPartitionStorage partitionStorage,
+ ClusterNode node,
+ long enlistmentConsistencyToken,
+ int creationCatalogVersion
+ ) {
+ inBusyLockSafe(busyLock, () -> {
+ if (indexStorage.getNextRowIdToBuild() == null) {
return;
}
- newTask.start();
+ IndexBuildTaskId taskId = new IndexBuildTaskId(tableId, partitionId, indexId);
- newTask.getTaskFuture().whenComplete((unused, throwable) -> indexBuildTaskById.remove(taskId));
+ IndexBuildTask newTask = new IndexBuildTask(
+ taskId,
+ indexStorage,
+ partitionStorage,
+ replicaService,
+ executor,
+ busyLock,
+ BATCH_SIZE,
+ node,
+ listeners,
+ enlistmentConsistencyToken,
+ creationCatalogVersion,
+ true
+ );
+
+ putAndStartTaskIfAbsent(taskId, newTask);
});
}
@@ -214,4 +268,19 @@
public void stopListen(IndexBuildCompletionListener listener) {
listeners.remove(listener);
}
+
+ private void putAndStartTaskIfAbsent(IndexBuildTaskId taskId, IndexBuildTask task) {
+ IndexBuildTask previousTask = indexBuildTaskById.putIfAbsent(taskId, task);
+
+ if (previousTask != null) {
+ // Index building is already in progress.
+ return;
+ }
+
+ try {
+ task.start();
+ } finally {
+ task.getTaskFuture().whenComplete((unused, throwable) -> indexBuildTaskById.remove(taskId));
+ }
+ }
}
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
index 7684ca7..d756c93 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexAvailabilityControllerTest.java
@@ -389,9 +389,12 @@
CompletableFuture<Void> finishBuildIndexFuture = new CompletableFuture<>();
- indexBuilder.listen((indexId1, tableId, partitionId1) -> {
- if (indexId1 == indexId && partitionId1 == partitionId) {
- finishBuildIndexFuture.complete(null);
+ indexBuilder.listen(new IndexBuildCompletionListener() {
+ @Override
+ public void onBuildCompletion(int indexId1, int tableId1, int partitionId1) {
+ if (indexId1 == indexId && partitionId1 == partitionId) {
+ finishBuildIndexFuture.complete(null);
+ }
}
});
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
index 326dae8..8b01a0b 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuildControllerTest.java
@@ -26,6 +26,7 @@
import static org.apache.ignite.internal.index.TestIndexManagementUtils.LOCAL_NODE;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_ID;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.NODE_NAME;
+import static org.apache.ignite.internal.index.TestIndexManagementUtils.PK_INDEX_NAME;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.TABLE_NAME;
import static org.apache.ignite.internal.index.TestIndexManagementUtils.createTable;
import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
@@ -143,6 +144,17 @@
anyLong(),
eq(indexCreationCatalogVersion(INDEX_NAME))
);
+
+ verify(indexBuilder, never()).scheduleBuildIndexAfterDisasterRecovery(
+ eq(tableId()),
+ eq(PARTITION_ID),
+ eq(indexId(INDEX_NAME)),
+ any(),
+ any(),
+ eq(LOCAL_NODE),
+ anyLong(),
+ eq(indexCreationCatalogVersion(INDEX_NAME))
+ );
}
@Test
@@ -165,6 +177,17 @@
anyLong(),
eq(indexCreationCatalogVersion(INDEX_NAME))
);
+
+ verify(indexBuilder, never()).scheduleBuildIndexAfterDisasterRecovery(
+ eq(tableId()),
+ eq(PARTITION_ID),
+ eq(indexId(INDEX_NAME)),
+ any(),
+ any(),
+ eq(LOCAL_NODE),
+ anyLong(),
+ eq(indexCreationCatalogVersion(INDEX_NAME))
+ );
}
@Test
@@ -185,21 +208,16 @@
anyLong(),
eq(indexCreationCatalogVersion(INDEX_NAME))
);
- }
- @Test
- void testNotStartBuildPkIndexesOnPrimaryReplicaElected() {
- setPrimaryReplicaWhichExpiresInOneSecond(PARTITION_ID, NODE_NAME, NODE_ID, clock.now());
-
- verify(indexBuilder, never()).scheduleBuildIndex(
+ verify(indexBuilder).scheduleBuildIndexAfterDisasterRecovery(
eq(tableId()),
eq(PARTITION_ID),
- eq(indexId(pkIndexName(TABLE_NAME))),
+ eq(indexId(PK_INDEX_NAME)),
any(),
any(),
eq(LOCAL_NODE),
anyLong(),
- eq(indexCreationCatalogVersion(pkIndexName(TABLE_NAME)))
+ eq(indexCreationCatalogVersion(PK_INDEX_NAME))
);
}
@@ -212,14 +230,25 @@
createTable(catalogManager, tableName, COLUMN_NAME);
verify(indexBuilder, never()).scheduleBuildIndex(
- eq(tableId()),
+ eq(tableId(tableName)),
eq(PARTITION_ID),
eq(indexId(pkIndexName(tableName))),
any(),
any(),
eq(LOCAL_NODE),
anyLong(),
- eq(indexCreationCatalogVersion(pkIndexName(tableName)))
+ anyInt()
+ );
+
+ verify(indexBuilder, never()).scheduleBuildIndexAfterDisasterRecovery(
+ eq(tableId(tableName)),
+ eq(PARTITION_ID),
+ eq(indexId(pkIndexName(tableName))),
+ any(),
+ any(),
+ eq(LOCAL_NODE),
+ anyLong(),
+ anyInt()
);
}
@@ -264,6 +293,17 @@
anyLong(),
anyInt()
);
+
+ verify(indexBuilder).scheduleBuildIndexAfterDisasterRecovery(
+ eq(tableId()),
+ eq(PARTITION_ID),
+ eq(indexId0),
+ any(),
+ any(),
+ eq(LOCAL_NODE),
+ anyLong(),
+ anyInt()
+ );
}
private void createIndex(String indexName) {
@@ -294,7 +334,11 @@
}
private int tableId() {
- return getTableIdStrict(catalogManager, TABLE_NAME, clock.nowLong());
+ return tableId(TABLE_NAME);
+ }
+
+ private int tableId(String tableName) {
+ return getTableIdStrict(catalogManager, tableName, clock.nowLong());
}
private int indexId(String indexName) {
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
index a499eee..56b8f77 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexBuilderTest.java
@@ -27,6 +27,7 @@
import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -92,7 +93,12 @@
void testStopListenIndexBuildCompletion() {
CompletableFuture<Void> invokeListenerFuture = new CompletableFuture<>();
- IndexBuildCompletionListener listener = (indexId, tableId, partitionId) -> invokeListenerFuture.complete(null);
+ IndexBuildCompletionListener listener = new IndexBuildCompletionListener() {
+ @Override
+ public void onBuildCompletion(int indexId, int tableId, int partitionId) {
+ invokeListenerFuture.complete(null);
+ }
+ };
indexBuilder.listen(listener);
indexBuilder.stopListen(listener);
@@ -149,6 +155,16 @@
verify(replicaService, times(2)).invoke(any(ClusterNode.class), any(BuildIndexReplicaRequest.class));
}
+ @Test
+ void testScheduleBuildIndexAfterDisasterRecovery() {
+ CompletableFuture<Void> listenCompletionIndexBuildingAfterDisasterRecoveryFuture =
+ listenCompletionIndexBuildingAfterDisasterRecovery(INDEX_ID, TABLE_ID, PARTITION_ID);
+
+ scheduleBuildIndexAfterDisasterRecovery(INDEX_ID, TABLE_ID, PARTITION_ID, List.of(rowId(PARTITION_ID)));
+
+ assertThat(listenCompletionIndexBuildingAfterDisasterRecoveryFuture, willCompleteSuccessfully());
+ }
+
private void scheduleBuildIndex(int indexId, int tableId, int partitionId, Collection<RowId> nextRowIdsToBuild) {
indexBuilder.scheduleBuildIndex(
tableId,
@@ -162,12 +178,53 @@
);
}
- private CompletableFuture<Void> listenCompletionIndexBuilding(int indexId, int tableId, int partitionId) {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ private void scheduleBuildIndexAfterDisasterRecovery(int indexId, int tableId, int partitionId, Collection<RowId> nextRowIdsToBuild) {
+ indexBuilder.scheduleBuildIndexAfterDisasterRecovery(
+ tableId,
+ partitionId,
+ indexId,
+ indexStorage(nextRowIdsToBuild),
+ mock(MvPartitionStorage.class),
+ mock(ClusterNode.class),
+ ANY_ENLISTMENT_CONSISTENCY_TOKEN,
+ ANY_INDEX_CREATION_CATALOG_VERSION
+ );
+ }
- indexBuilder.listen((indexId1, tableId1, partitionId1) -> {
- if (indexId1 == indexId && tableId1 == tableId && partitionId1 == partitionId) {
- future.complete(null);
+ private CompletableFuture<Void> listenCompletionIndexBuilding(int indexId, int tableId, int partitionId) {
+ var future = new CompletableFuture<Void>();
+
+ indexBuilder.listen(new IndexBuildCompletionListener() {
+ @Override
+ public void onBuildCompletion(int indexId1, int tableId1, int partitionId1) {
+ if (indexId1 == indexId && tableId1 == tableId && partitionId1 == partitionId) {
+ future.complete(null);
+ }
+ }
+
+ @Override
+ public void onBuildCompletionAfterDisasterRecovery(int indexId, int tableId, int partitionId) {
+ fail(String.format("indexId=%s, tableId=%s, partitionId=%s", indexId, tableId, partitionId));
+ }
+ });
+
+ return future;
+ }
+
+ private CompletableFuture<Void> listenCompletionIndexBuildingAfterDisasterRecovery(int indexId, int tableId, int partitionId) {
+ var future = new CompletableFuture<Void>();
+
+ indexBuilder.listen(new IndexBuildCompletionListener() {
+ @Override
+ public void onBuildCompletionAfterDisasterRecovery(int indexId1, int tableId1, int partitionId1) {
+ if (indexId1 == indexId && tableId1 == tableId && partitionId1 == partitionId) {
+ future.complete(null);
+ }
+ }
+
+ @Override
+ public void onBuildCompletion(int indexId, int tableId, int partitionId) {
+ fail(String.format("indexId=%s, tableId=%s, partitionId=%s", indexId, tableId, partitionId));
}
});