IGNITE-21988 Don't allow reading from index storage if it is in build state (#3561)
diff --git a/check-rules/spotbugs-excludes.xml b/check-rules/spotbugs-excludes.xml
index acf0541..a548ea0 100644
--- a/check-rules/spotbugs-excludes.xml
+++ b/check-rules/spotbugs-excludes.xml
@@ -217,6 +217,11 @@
<Field name="LEASE_PREFIX"/>
</Or>
</Match>
+ <Match>
+ <Bug pattern="IT_NO_SUCH_ELEMENT"/>
+ <Class name="org.apache.ignite.internal.storage.pagememory.index.sorted.PageMemorySortedIndexStorage$ReadOnlyScanCursor"/>
+ <Method name="next"/>
+ </Match>
<!-- end of false-positive exclusions -->
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index dc045fd..3320d05 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -417,6 +417,12 @@
/** Operation on a destroyed storage. */
public static final int ALREADY_DESTROYED_ERR = STORAGE_ERR_GROUP.registerErrorCode((short) 5);
+
+ /** Error reading from an index that has not yet been built. */
+ public static final int INDEX_NOT_BUILT_ERR = STORAGE_ERR_GROUP.registerErrorCode((short) 6);
+
+ /** Error when detecting an inconsistent index state. */
+ public static final int INCONSISTENT_INDEX_STATE_ERR = STORAGE_ERR_GROUP.registerErrorCode((short) 7);
}
/** Distribution zones error group. */
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h b/modules/platforms/cpp/ignite/common/error_codes.h
index 0db0ddb..39c7778 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -147,6 +147,8 @@
ALREADY_CLOSED = 0x90003,
STORAGE_REBALANCE = 0x90004,
ALREADY_DESTROYED = 0x90005,
+ INDEX_NOT_BUILT = 0x90006,
+ INCONSISTENT_INDEX_STATE = 0x90007,
// DistributionZones group. Group code: 10
ZONE_NOT_FOUND = 0xa0001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 9afc218..2a7a4f2 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -227,6 +227,8 @@
case error::code::ALREADY_CLOSED:
case error::code::STORAGE_REBALANCE:
case error::code::ALREADY_DESTROYED:
+ case error::code::INDEX_NOT_BUILT:
+ case error::code::INCONSISTENT_INDEX_STATE:
return sql_state::SHY000_GENERAL_ERROR;
// DistributionZones group. Group code: 10
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 342eea7..9d979cf 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -356,6 +356,12 @@
/// <summary> AlreadyDestroyed error. </summary>
public const int AlreadyDestroyed = (GroupCode << 16) | (5 & 0xFFFF);
+
+ /// <summary> IndexNotBuilt error. </summary>
+ public const int IndexNotBuilt = (GroupCode << 16) | (6 & 0xFFFF);
+
+ /// <summary> InconsistentIndexState error. </summary>
+ public const int InconsistentIndexState = (GroupCode << 16) | (7 & 0xFFFF);
}
/// <summary> DistributionZones errors. </summary>
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/InconsistentIndexStateException.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/InconsistentIndexStateException.java
new file mode 100644
index 0000000..423c215
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/InconsistentIndexStateException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static org.apache.ignite.lang.ErrorGroups.Storage.INCONSISTENT_INDEX_STATE_ERR;
+
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.storage.StorageException;
+
+/** Exception that occurs when an index is not consistent, for example, when reading from a readable index and it has not yet been built. */
+public class InconsistentIndexStateException extends StorageException {
+ private static final long serialVersionUID = 344560487657914429L;
+
+ /**
+ * Constructor.
+ *
+ * @param messagePattern Error message pattern.
+ * @param params Error message params.
+ * @see IgniteStringFormatter#format(String, Object...)
+ */
+ public InconsistentIndexStateException(String messagePattern, Object... params) {
+ super(INCONSISTENT_INDEX_STATE_ERR, messagePattern, params);
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexNotBuiltException.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexNotBuiltException.java
new file mode 100644
index 0000000..bf87465
--- /dev/null
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexNotBuiltException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.index;
+
+import static org.apache.ignite.lang.ErrorGroups.Storage.INDEX_NOT_BUILT_ERR;
+
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.storage.StorageException;
+
+/** Exception occurring while reading from an index that has not yet been built. */
+public class IndexNotBuiltException extends StorageException {
+ private static final long serialVersionUID = 7512376065062977603L;
+
+ /**
+ * Constructor.
+ *
+ * @param messagePattern Error message pattern.
+ * @param params Error message params.
+ * @see IgniteStringFormatter#format(String, Object...)
+ */
+ public IndexNotBuiltException(String messagePattern, Object... params) {
+ super(INDEX_NOT_BUILT_ERR, messagePattern, params);
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexStorage.java
index 6907b53..24cf73e 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexStorage.java
@@ -33,6 +33,7 @@
* Returns a cursor over {@code RowId}s associated with the given index key.
*
* @throws StorageException If failed to read data.
+ * @throws IndexNotBuiltException If the index has not yet been built.
*/
Cursor<RowId> get(BinaryTuple key) throws StorageException;
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
index 16abcc6..17712c0 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
@@ -19,6 +19,7 @@
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.util.Cursor;
import org.intellij.lang.annotations.MagicConstant;
import org.jetbrains.annotations.Nullable;
@@ -58,6 +59,8 @@
* are {@link #GREATER_OR_EQUAL}, {@link #LESS_OR_EQUAL}.
* @return Cursor with fetched index rows.
* @throws IllegalArgumentException If backwards flag is passed and backwards iteration is not supported by the storage.
+ * @throws StorageException If failed to read data.
+ * @throws IndexNotBuiltException If the index has not yet been built.
*/
PeekCursor<IndexRow> scan(
@Nullable BinaryTuplePrefix lowerBound,
@@ -76,6 +79,8 @@
* are {@link #GREATER_OR_EQUAL}, {@link #LESS_OR_EQUAL}.
* @return Cursor with fetched index rows.
* @throws IllegalArgumentException If backwards flag is passed and backwards iteration is not supported by the storage.
+ * @throws StorageException If failed to read data.
+ * @throws IndexNotBuiltException If the index has not yet been built.
*/
default Cursor<IndexRow> readOnlyScan(
@Nullable BinaryTuplePrefix lowerBound,
@@ -84,4 +89,27 @@
) {
return scan(lowerBound, upperBound, flags);
}
+
+ /**
+ * Returns a range of updatable index values between the lower bound and the upper bound, supporting read-write transactions.
+ *
+ * <p>Unlike method {@link #scan(BinaryTuplePrefix, BinaryTuplePrefix, int)}, it allows you to read from an unbuilt index.</p>
+ *
+ * @param lowerBound Lower bound. Exclusivity is controlled by a {@link #GREATER_OR_EQUAL} or {@link #GREATER} flag.
+ * {@code null} means unbounded.
+ * @param upperBound Upper bound. Exclusivity is controlled by a {@link #LESS} or {@link #LESS_OR_EQUAL} flag.
+ * {@code null} means unbounded.
+ * @param flags Control flags. {@link #GREATER} | {@link #LESS} by default. Other available values
+ * are {@link #GREATER_OR_EQUAL}, {@link #LESS_OR_EQUAL}.
+ * @return Cursor with fetched index rows.
+ * @throws IllegalArgumentException If backwards flag is passed and backwards iteration is not supported by the storage.
+ * @throws StorageException If failed to read data.
+ * @throws InconsistentIndexStateException If the index is in a readable status, but the index is not built.
+ */
+ // TODO: IGNITE-22039 Implement throw an InconsistentIndexStateException if the index is not in a readable status and write tests
+ PeekCursor<IndexRow> tolerantScan(
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
+ );
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageHashIndexDescriptor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageHashIndexDescriptor.java
index c3c1596..c27cf38 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageHashIndexDescriptor.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageHashIndexDescriptor.java
@@ -24,6 +24,7 @@
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.type.NativeType;
@@ -39,6 +40,7 @@
public static class StorageHashIndexColumnDescriptor implements StorageColumnDescriptor {
private final String name;
+ @IgniteToStringInclude
private final NativeType type;
private final boolean nullable;
@@ -79,6 +81,7 @@
private final int id;
+ @IgniteToStringInclude
private final List<StorageHashIndexColumnDescriptor> columns;
private final boolean pk;
@@ -121,6 +124,11 @@
return pk;
}
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+
private static List<StorageHashIndexColumnDescriptor> extractIndexColumnsConfiguration(
CatalogTableDescriptor table,
CatalogHashIndexDescriptor index
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageSortedIndexDescriptor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageSortedIndexDescriptor.java
index 4ed6ebf..3da5f1e 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageSortedIndexDescriptor.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/StorageSortedIndexDescriptor.java
@@ -27,6 +27,7 @@
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.type.NativeType;
@@ -42,6 +43,7 @@
public static class StorageSortedIndexColumnDescriptor implements StorageColumnDescriptor {
private final String name;
+ @IgniteToStringInclude
private final NativeType type;
private final boolean nullable;
@@ -95,6 +97,7 @@
private final int id;
+ @IgniteToStringInclude
private final List<StorageSortedIndexColumnDescriptor> columns;
private final BinaryTupleSchema binaryTupleSchema;
@@ -148,6 +151,11 @@
return pk;
}
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+
/**
* Returns a {@code BinaryTupleSchema} that corresponds to the index configuration.
*/
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
index 7b202b6..556579c 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/ThreadAssertingSortedIndexStorage.java
@@ -54,6 +54,15 @@
@Override
public Cursor<IndexRow> readOnlyScan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+ assertThreadAllowsToRead();
+
return new ThreadAssertingCursor<>(indexStorage.readOnlyScan(lowerBound, upperBound, flags));
}
+
+ @Override
+ public PeekCursor<IndexRow> tolerantScan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+ assertThreadAllowsToRead();
+
+ return new ThreadAssertingPeekCursor<>(indexStorage.tolerantScan(lowerBound, upperBound, flags));
+ }
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
index 0a3954b..456b01b 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageUtils.java
@@ -26,6 +26,8 @@
import org.apache.ignite.internal.storage.StorageDestroyedException;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
+import org.apache.ignite.internal.storage.index.IndexNotBuiltException;
+import org.jetbrains.annotations.Nullable;
/**
* Helper class for storages.
@@ -239,4 +241,16 @@
public static RowId initialRowIdToBuild(int partitionId) {
return RowId.lowestRowId(partitionId);
}
+
+ /**
+ * Throws an {@link IndexNotBuiltException} if the index has not yet been built.
+ *
+ * @param nextRowIdToBuild Next row ID to build, {@code null} if the index is built.
+ * @param storageInfoSupplier Storage information supplier, for example in the format "indexId=5, partitionId=1".
+ */
+ public static void throwExceptionIfIndexIsNotBuilt(@Nullable RowId nextRowIdToBuild, Supplier<String> storageInfoSupplier) {
+ if (nextRowIdToBuild != null) {
+ throw new IndexNotBuiltException("Index not built yet: [{}]", storageInfoSupplier.get());
+ }
+ }
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 93eb0fd..c06efb3 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -383,8 +383,8 @@
MvPartitionStorage partitionStorage = getOrCreateMvPartition(PARTITION_ID);
- SortedIndexStorage sortedIndexStorage1 = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexDescriptor1);
- SortedIndexStorage sortedIndexStorage2 = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIndexDescriptor2);
+ SortedIndexStorage sortedIndexStorage1 = getOrCreateIndex(PARTITION_ID, sortedIndexDescriptor1);
+ SortedIndexStorage sortedIndexStorage2 = getOrCreateIndex(PARTITION_ID, sortedIndexDescriptor2);
List<TestRow> rows = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
@@ -436,8 +436,8 @@
MvPartitionStorage partitionStorage = getOrCreateMvPartition(PARTITION_ID);
- HashIndexStorage hashIndexStorage1 = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexDescriptor1);
- HashIndexStorage hashIndexStorage2 = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIndexDescriptor2);
+ HashIndexStorage hashIndexStorage1 = getOrCreateIndex(PARTITION_ID, hashIndexDescriptor1);
+ HashIndexStorage hashIndexStorage2 = getOrCreateIndex(PARTITION_ID, hashIndexDescriptor2);
List<TestRow> rows = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
@@ -467,8 +467,8 @@
MvPartitionStorage partitionStorage2 = getOrCreateMvPartition(PARTITION_ID + 1);
- HashIndexStorage storage1 = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
- HashIndexStorage storage2 = tableStorage.getOrCreateHashIndex(PARTITION_ID + 1, hashIdx);
+ HashIndexStorage storage1 = getOrCreateIndex(PARTITION_ID, hashIdx);
+ HashIndexStorage storage2 = getOrCreateIndex(PARTITION_ID + 1, hashIdx);
assertThat(storage1, is(notNullValue()));
assertThat(storage2, is(notNullValue()));
@@ -517,6 +517,8 @@
checkStorageDestroyed((IndexStorage) storage);
assertThrows(StorageDestroyedException.class, () -> storage.scan(null, null, GREATER));
+ assertThrows(StorageDestroyedException.class, () -> storage.readOnlyScan(null, null, GREATER));
+ assertThrows(StorageDestroyedException.class, () -> storage.tolerantScan(null, null, GREATER));
}
@SuppressWarnings({"resource", "deprecation"})
@@ -585,8 +587,8 @@
@Test
public void testSuccessRebalance() {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
- HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
- SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
+ HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
+ SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
// Error because rebalance has not yet started for the partition.
assertThrows(
@@ -636,6 +638,8 @@
assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 10, 20, raftGroupConfig), willCompleteSuccessfully());
+ completeBuiltIndexes(PARTITION_ID, hashIndexStorage, sortedIndexStorage);
+
// Let's check the storages after success finish rebalance.
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsBeforeRebalanceStart);
checkForPresenceRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance);
@@ -647,8 +651,8 @@
@Test
public void testFailRebalance() throws Exception {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
- HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
- SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
+ HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
+ SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
// Nothing will happen because rebalancing has not started.
tableStorage.abortRebalancePartition(PARTITION_ID).get(1, SECONDS);
@@ -683,6 +687,8 @@
assertThat(tableStorage.abortRebalancePartition(PARTITION_ID), willCompleteSuccessfully());
+ completeBuiltIndexes(PARTITION_ID, hashIndexStorage, sortedIndexStorage);
+
// Let's check the storages after abort rebalance.
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsBeforeRebalanceStart);
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance);
@@ -707,13 +713,15 @@
assertThrows(StorageRebalanceException.class, () -> storage.get(mock(BinaryTuple.class)));
assertThrows(StorageRebalanceException.class, () -> storage.remove(mock(IndexRow.class)));
assertThrows(StorageRebalanceException.class, () -> storage.scan(null, null, GREATER));
+ assertThrows(StorageRebalanceException.class, () -> storage.readOnlyScan(null, null, GREATER));
+ assertThrows(StorageRebalanceException.class, () -> storage.tolerantScan(null, null, GREATER));
}
@Test
public void testRestartStoragesInTheMiddleOfRebalance() throws Exception {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
- HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
- SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
+ HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
+ SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
List<TestRow> rows = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
@@ -739,8 +747,8 @@
tableStorage = createMvTableStorage();
mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
- hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
- sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
+ hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
+ sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
if (tableStorage.isVolatile()) {
// Let's check the repositories: they should be empty.
@@ -762,8 +770,8 @@
assertThrows(StorageException.class, () -> tableStorage.clearPartition(PARTITION_ID));
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
- HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
- SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
+ HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
+ SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
// Let's check the cleanup for an empty partition.
assertThat(tableStorage.clearPartition(PARTITION_ID), willCompleteSuccessfully());
@@ -795,6 +803,8 @@
checkLastApplied(mvPartitionStorage, 0, 0);
assertNull(mvPartitionStorage.committedGroupConfiguration());
+ completeBuiltIndexes(PARTITION_ID, hashIndexStorage, sortedIndexStorage);
+
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
}
@@ -1189,8 +1199,8 @@
assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getPartitionIdOutOfRange()));
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
- HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
- SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
+ HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
+ SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
RowId rowId = new RowId(PARTITION_ID);
@@ -1224,6 +1234,8 @@
Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(hashIndexRow.indexColumns());
Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, GREATER);
+ Cursor<IndexRow> readOnlyScanFromSortedIndexCursor = sortedIndexStorage.readOnlyScan(null, null, GREATER);
+ Cursor<IndexRow> tolerantScanFromSortedIndexCursor = sortedIndexStorage.tolerantScan(null, null, GREATER);
CompletableFuture<Void> destroyFuture = tableStorage.destroyPartition(PARTITION_ID);
if (waitForDestroyFuture) {
@@ -1248,6 +1260,8 @@
assertThrows(StorageDestroyedException.class, () -> getAll(getFromSortedIndexCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(scanFromSortedIndexCursor));
+ assertThrows(StorageDestroyedException.class, () -> getAll(readOnlyScanFromSortedIndexCursor));
+ assertThrows(StorageDestroyedException.class, () -> getAll(tolerantScanFromSortedIndexCursor));
// What happens if there is no partition?
assertThrows(StorageException.class, () -> tableStorage.destroyPartition(PARTITION_ID));
@@ -1258,8 +1272,8 @@
@ValueSource(booleans = {false, true})
public void testDestroyTableStorage(boolean waitForDestroyFuture) {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
- HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
- SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
+ HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
+ SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
List<TestRow> rows = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
@@ -1277,6 +1291,8 @@
Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(sortedIndexRow.indexColumns());
Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, GREATER);
+ Cursor<IndexRow> readOnlyScanFromSortedIndexCursor = sortedIndexStorage.readOnlyScan(null, null, GREATER);
+ Cursor<IndexRow> tolerantScanFromSortedIndexCursor = sortedIndexStorage.tolerantScan(null, null, GREATER);
CompletableFuture<Void> destroyFuture = tableStorage.destroy();
@@ -1296,6 +1312,8 @@
assertThrows(StorageDestroyedException.class, () -> getAll(getFromSortedIndexCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(scanFromSortedIndexCursor));
+ assertThrows(StorageDestroyedException.class, () -> getAll(readOnlyScanFromSortedIndexCursor));
+ assertThrows(StorageDestroyedException.class, () -> getAll(tolerantScanFromSortedIndexCursor));
// Let's check that nothing will happen if we try to destroy it again.
assertThat(tableStorage.destroy(), willCompleteSuccessfully());
@@ -1309,8 +1327,8 @@
tableStorage = createMvTableStorage();
mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
- hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
- sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
+ hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
+ sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
}
@@ -1336,6 +1354,8 @@
Cursor<?> sortedIndexStorageGetCursor = sortedIndexStorage.get(sortedIndexRow.indexColumns());
Cursor<?> sortedIndexStorageScanCursor = sortedIndexStorage.scan(null, null, GREATER);
+ Cursor<?> sortedIndexStorageReadOnlyScanCursor = sortedIndexStorage.readOnlyScan(null, null, GREATER);
+ Cursor<?> sortedIndexStorageTolerantScanCursor = sortedIndexStorage.tolerantScan(null, null, GREATER);
// Partition is out of configuration range.
assertThrows(IllegalArgumentException.class, () -> tableStorage.startRebalancePartition(getPartitionIdOutOfRange()));
@@ -1359,6 +1379,8 @@
checkCursorAfterStartRebalance(sortedIndexStorageGetCursor);
checkCursorAfterStartRebalance(sortedIndexStorageScanCursor);
+ checkCursorAfterStartRebalance(sortedIndexStorageReadOnlyScanCursor);
+ checkCursorAfterStartRebalance(sortedIndexStorageTolerantScanCursor);
}
@SuppressWarnings({"resource", "deprecation"})
@@ -1435,9 +1457,9 @@
void testNextRowIdToBuildAfterOperation(Operation operation) throws Exception {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
- IndexStorage hashIndexStorage = tableStorage.getOrCreateIndex(PARTITION_ID, hashIdx);
- IndexStorage sortedIndexStorage = tableStorage.getOrCreateIndex(PARTITION_ID, sortedIdx);
- IndexStorage pkIndexStorage = tableStorage.getOrCreateIndex(PARTITION_ID, pkIdx);
+ IndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx, false);
+ IndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx, false);
+ IndexStorage pkIndexStorage = getOrCreateIndex(PARTITION_ID, pkIdx, false);
RowId rowId0 = new RowId(PARTITION_ID);
RowId rowId1 = new RowId(PARTITION_ID);
@@ -1458,9 +1480,9 @@
mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
- IndexStorage recreatedHashIndexStorage = tableStorage.getOrCreateIndex(PARTITION_ID, hashIdx);
- IndexStorage recreatedSortedIndexStorage = tableStorage.getOrCreateIndex(PARTITION_ID, sortedIdx);
- IndexStorage recreatedPkIndexStorage = tableStorage.getOrCreateIndex(PARTITION_ID, pkIdx);
+ IndexStorage recreatedHashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx, false);
+ IndexStorage recreatedSortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx, false);
+ IndexStorage recreatedPkIndexStorage = getOrCreateIndex(PARTITION_ID, pkIdx, false);
assertThat(recreatedHashIndexStorage.getNextRowIdToBuild(), is(equalTo(INITIAL_ROW_ID_TO_BUILD)));
assertThat(recreatedSortedIndexStorage.getNextRowIdToBuild(), is(equalTo(INITIAL_ROW_ID_TO_BUILD)));
@@ -1483,4 +1505,51 @@
private interface Operation {
void doOperation() throws Exception;
}
+
+ /**
+ * Returns an already created index or creates a new one.
+ *
+ * @param partitionId Partition ID.
+ * @param indexDescriptor Storage index descriptor.
+ * @param built {@code True} if index building needs to be completed.
+ * @see #completeBuiltIndexes(int, IndexStorage...)
+ */
+ protected <T extends IndexStorage> T getOrCreateIndex(
+ int partitionId,
+ StorageIndexDescriptor indexDescriptor,
+ boolean built
+ ) {
+ IndexStorage indexStorage = tableStorage.getOrCreateIndex(partitionId, indexDescriptor);
+
+ assertNotNull(indexStorage, "index=" + indexDescriptor);
+
+ if (indexStorage.getNextRowIdToBuild() != null && built) {
+ completeBuiltIndexes(partitionId, indexStorage);
+ }
+
+ return (T) indexStorage;
+ }
+
+ /**
+ * Returns an already created index or creates a new one with the completion of building.
+ *
+ * @param partitionId Partition ID.
+ * @param indexDescriptor Storage index descriptor.
+ * @see #completeBuiltIndexes(int, IndexStorage...)
+ */
+ protected <T extends IndexStorage> T getOrCreateIndex(
+ int partitionId,
+ StorageIndexDescriptor indexDescriptor
+ ) {
+ return getOrCreateIndex(partitionId, indexDescriptor, true);
+ }
+
+ /** Completes the building of indexes. */
+ protected void completeBuiltIndexes(int partitionId, IndexStorage... indexStorages) {
+ MvPartitionStorage partitionStorage = getOrCreateMvPartition(partitionId);
+
+ assertNotNull(partitionStorage, "partitionId=" + partitionId);
+
+ TestStorageUtils.completeBuiltIndexes(partitionStorage, indexStorages);
+ }
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/TestStorageUtils.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/TestStorageUtils.java
new file mode 100644
index 0000000..0b7c698
--- /dev/null
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/TestStorageUtils.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage;
+
+import org.apache.ignite.internal.storage.index.IndexStorage;
+
+/** Auxiliary class for tests that contains useful methods, constants, etc. */
+public class TestStorageUtils {
+ /** Completes the building of indexes. */
+ public static void completeBuiltIndexes(MvPartitionStorage partitionStorage, IndexStorage... indexStorages) {
+ partitionStorage.runConsistently(locker -> {
+ for (IndexStorage indexStorage : indexStorages) {
+ indexStorage.setNextRowIdToBuild(null);
+ }
+
+ return null;
+ });
+ }
+}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
index cc2d8d5..7a0b604 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractHashIndexStorageTest.java
@@ -40,7 +40,7 @@
*/
public abstract class AbstractHashIndexStorageTest extends AbstractIndexStorageTest<HashIndexStorage, StorageHashIndexDescriptor> {
@Override
- protected HashIndexStorage createIndexStorage(String name, ColumnType... columnTypes) {
+ protected HashIndexStorage createIndexStorage(String name, boolean built, ColumnType... columnTypes) {
CatalogTableDescriptor tableDescriptor = catalogService.table(TABLE_NAME, clock.nowLong());
int tableId = tableDescriptor.id();
@@ -48,10 +48,16 @@
CatalogHashIndexDescriptor indexDescriptor = createCatalogIndexDescriptor(tableId, indexId, name, columnTypes);
- return tableStorage.getOrCreateHashIndex(
+ HashIndexStorage indexStorage = tableStorage.getOrCreateHashIndex(
TEST_PARTITION,
new StorageHashIndexDescriptor(tableDescriptor, indexDescriptor)
);
+
+ if (built) {
+ completeBuildIndex(indexStorage);
+ }
+
+ return indexStorage;
}
@Override
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
index a808c99..868c2b5 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
@@ -60,6 +60,7 @@
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TestStorageUtils;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.impl.BinaryTupleRowSerializer;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -150,9 +151,7 @@
createTestTable();
}
- /**
- * Configures a test table with columns of all supported types.
- */
+ /** Configures a test table with columns of all supported types. */
private void createTestTable() {
ColumnParams pkColumn = ColumnParams.builder().name("pk").type(ColumnType.INT32).nullable(false).build();
@@ -168,7 +167,7 @@
TABLE_NAME,
zoneId,
Stream.concat(Stream.of(pkColumn), ALL_TYPES_COLUMN_PARAMS.stream()).map(CatalogUtils::fromParams).collect(toList()),
- List.of("pk"),
+ List.of(pkColumn.name()),
null,
DEFAULT_STORAGE_PROFILE
);
@@ -182,9 +181,25 @@
/**
* Creates an IndexStorage instance using the given columns.
*
+ * @param name Index name.
+ * @param built {@code True} to create a built index, {@code false} if you need to build it later.
+ * @param columnTypes Column types.
* @see #columnName(ColumnType)
+ * @see #completeBuildIndex(IndexStorage)
*/
- protected abstract S createIndexStorage(String name, ColumnType... columnTypes);
+ protected abstract S createIndexStorage(String name, boolean built, ColumnType... columnTypes);
+
+ /**
+ * Creates a built IndexStorage instance using the given columns.
+ *
+ * @param name Index name.
+ * @param columnTypes Column types.
+ * @see #columnName(ColumnType)
+ * @see #completeBuildIndex(IndexStorage)
+ */
+ protected S createIndexStorage(String name, ColumnType... columnTypes) {
+ return createIndexStorage(name, true, columnTypes);
+ }
/**
* Provides safe access to the index descriptor of the storage.
@@ -364,7 +379,7 @@
assertNull(indexStorage.getNextRowIdToBuild());
} else {
- indexStorage = createIndexStorage(INDEX_NAME, ColumnType.INT32);
+ indexStorage = createIndexStorage(INDEX_NAME, false, ColumnType.INT32);
assertEquals(initialRowIdToBuild(TEST_PARTITION), indexStorage.getNextRowIdToBuild());
}
@@ -380,6 +395,30 @@
assertEquals(newNextRowIdToBuild, newNextRowIdToBuild);
}
+ @Test
+ void testGetFromPkIndex() {
+ S pkIndex = createPkIndexStorage();
+ var serializer = new BinaryTupleRowSerializer(indexDescriptor(pkIndex));
+
+ IndexRow indexRow = createIndexRow(serializer, 1);
+
+ assertDoesNotThrow(() -> getAll(pkIndex, indexRow));
+ }
+
+ @Test
+ void testGetAfterBuiltIndex() {
+ S index = createIndexStorage(INDEX_NAME, false, ColumnType.INT32);
+ var serializer = new BinaryTupleRowSerializer(indexDescriptor(index));
+
+ IndexRow indexRow = createIndexRow(serializer, 1);
+
+ assertThrows(IndexNotBuiltException.class, () -> getAll(index, indexRow));
+
+ completeBuildIndex(index);
+
+ assertDoesNotThrow(() -> getAll(index, indexRow));
+ }
+
protected static Collection<RowId> getAll(IndexStorage index, IndexRow row) {
try (Cursor<RowId> cursor = index.get(row.indexColumns())) {
return cursor.stream().collect(toList());
@@ -432,14 +471,23 @@
when(catalogService.index(eq(indexDescriptor.id()), anyInt())).thenReturn(indexDescriptor);
}
- IndexStorage createPkIndexStorage() {
+ S createPkIndexStorage() {
CatalogTableDescriptor tableDescriptor = catalogService.table(TABLE_NAME, clock.nowLong());
CatalogIndexDescriptor pkIndexDescriptor = catalogService.aliveIndex(PK_INDEX_NAME, clock.nowLong());
- return tableStorage.getOrCreateIndex(
+ return (S) tableStorage.getOrCreateIndex(
TEST_PARTITION,
StorageIndexDescriptor.create(tableDescriptor, pkIndexDescriptor)
);
}
+
+ /** Completes the building of the index and makes read operations available from it. */
+ void completeBuildIndex(IndexStorage indexStorage) {
+ TestStorageUtils.completeBuiltIndexes(partitionStorage, indexStorage);
+ }
+
+ static IndexRow createIndexRow(BinaryTupleRowSerializer serializer, Object... values) {
+ return serializer.serializeRow(values, new RowId(TEST_PARTITION));
+ }
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index be854f4..99b9fe0 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -38,6 +38,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -88,17 +89,23 @@
private static final IgniteLogger LOG = Loggers.forClass(AbstractSortedIndexStorageTest.class);
@Override
- protected SortedIndexStorage createIndexStorage(String name, ColumnType... columnTypes) {
- return createIndexStorage(name, toCatalogIndexColumnDescriptors(columnTypes));
+ protected SortedIndexStorage createIndexStorage(String name, boolean built, ColumnType... columnTypes) {
+ return createIndexStorage(name, built, toCatalogIndexColumnDescriptors(columnTypes));
}
/**
* Creates a Sorted Index using the given columns.
+ *
+ * @param name Index name.
+ * @param built {@code True} to create a built index, {@code false} if you need to build it later.
+ * @param columns Columns.
+ * @see #completeBuildIndex(IndexStorage)
*/
- protected SortedIndexStorage createIndexStorage(String name, List<ColumnParams> indexSchema) {
+ protected SortedIndexStorage createIndexStorage(String name, boolean built, List<ColumnParams> columns) {
return createIndexStorage(
name,
- indexSchema.stream()
+ built,
+ columns.stream()
.map(ColumnParams::name)
.map(columnName -> new CatalogIndexColumnDescriptor(
columnName,
@@ -109,9 +116,25 @@
}
/**
- * Creates a Sorted Index using the given index definition.
+ * Creates a built Sorted Index using the given columns.
+ *
+ * @param name Index name.
+ * @param columns Columns.
+ * @see #completeBuildIndex(IndexStorage)
*/
- protected SortedIndexStorage createIndexStorage(String name, CatalogIndexColumnDescriptor... columns) {
+ protected SortedIndexStorage createIndexStorage(String name, List<ColumnParams> columns) {
+ return createIndexStorage(name, true, columns);
+ }
+
+ /**
+ * Creates a Sorted Index using the given index definition.
+ *
+ * @param name Index name.
+ * @param built {@code True} to create a built index, {@code false} if you need to build it later.
+ * @param columns Columns.
+ * @see #completeBuildIndex(IndexStorage)
+ */
+ protected SortedIndexStorage createIndexStorage(String name, boolean built, CatalogIndexColumnDescriptor... columns) {
CatalogTableDescriptor tableDescriptor = catalogService.table(TABLE_NAME, clock.nowLong());
int tableId = tableDescriptor.id();
@@ -119,10 +142,27 @@
CatalogSortedIndexDescriptor indexDescriptor = createCatalogIndexDescriptor(tableId, indexId, name, columns);
- return tableStorage.getOrCreateSortedIndex(
+ SortedIndexStorage indexStorage = tableStorage.getOrCreateSortedIndex(
TEST_PARTITION,
new StorageSortedIndexDescriptor(tableDescriptor, indexDescriptor)
);
+
+ if (built) {
+ completeBuildIndex(indexStorage);
+ }
+
+ return indexStorage;
+ }
+
+ /**
+ * Creates a built Sorted Index using the given index definition.
+ *
+ * @param name Index name.
+ * @param columns Columns.
+ * @see #completeBuildIndex(IndexStorage)
+ */
+ protected SortedIndexStorage createIndexStorage(String name, CatalogIndexColumnDescriptor... columns) {
+ return createIndexStorage(name, true, columns);
}
@Override
@@ -1452,6 +1492,30 @@
assertThat(getAll(index, row3), is(empty()));
}
+ @Test
+ void testScanFromPkIndex() {
+ SortedIndexStorage pkIndex = createPkIndexStorage();
+
+ assertDoesNotThrow(() -> scan(pkIndex, index -> index.readOnlyScan(null, null, 0)));
+ assertDoesNotThrow(() -> scan(pkIndex, index -> index.scan(null, null, 0)));
+ assertDoesNotThrow(() -> scan(pkIndex, index -> index.tolerantScan(null, null, 0)));
+ }
+
+ @Test
+ void testScanAfterBuiltIndex() {
+ SortedIndexStorage index = createIndexStorage(INDEX_NAME, false, ColumnType.INT32);
+
+ assertThrows(IndexNotBuiltException.class, () -> scan(index, i -> i.readOnlyScan(null, null, 0)));
+ assertThrows(IndexNotBuiltException.class, () -> scan(index, i -> i.scan(null, null, 0)));
+ assertDoesNotThrow(() -> scan(index, i -> i.tolerantScan(null, null, 0)));
+
+ completeBuildIndex(index);
+
+ assertDoesNotThrow(() -> scan(index, i -> i.readOnlyScan(null, null, 0)));
+ assertDoesNotThrow(() -> scan(index, i -> i.scan(null, null, 0)));
+ assertDoesNotThrow(() -> scan(index, i -> i.tolerantScan(null, null, 0)));
+ }
+
private List<ColumnParams> shuffledRandomColumnParams() {
return shuffledColumnParams(d -> random.nextBoolean());
}
@@ -1550,6 +1614,16 @@
}
}
+ private static List<Object[]> scan(SortedIndexStorage index, Function<SortedIndexStorage, Cursor<IndexRow>> cursorFunction) {
+ var serializer = new BinaryTupleRowSerializer(index.indexDescriptor());
+
+ try (Cursor<IndexRow> cursor = cursorFunction.apply(index)) {
+ return cursor.stream()
+ .map(serializer::deserializeColumns)
+ .collect(toUnmodifiableList());
+ }
+ }
+
private static Cursor<IndexRow> getIndexRowCursor(
SortedIndexStorage indexStorage,
BinaryTuplePrefix lowerBound,
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/AbstractTestIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/AbstractTestIndexStorage.java
index 6a38d3d..175b77f 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/AbstractTestIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/AbstractTestIndexStorage.java
@@ -21,11 +21,14 @@
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageDestroyedException;
import org.apache.ignite.internal.storage.StorageRebalanceException;
import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
+import org.apache.ignite.internal.storage.util.StorageUtils;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -39,16 +42,20 @@
private volatile @Nullable RowId nextRowIdToBuild;
- private final int partitionId;
+ protected final int partitionId;
private final boolean pk;
+ private final int indexId;
+
/** Amount of cursors that opened and still do not close. */
protected final AtomicInteger pendingCursors = new AtomicInteger();
- AbstractTestIndexStorage(int partitionId, boolean pk) {
+ AbstractTestIndexStorage(int partitionId, StorageIndexDescriptor descriptor) {
this.partitionId = partitionId;
- this.pk = pk;
+ this.pk = descriptor.isPk();
+ this.indexId = descriptor.id();
+
nextRowIdToBuild = pk ? null : initialRowIdToBuild(partitionId);
}
@@ -65,6 +72,8 @@
public Cursor<RowId> get(BinaryTuple key) {
checkStorageClosedOrInProcessOfRebalance(true);
+ throwExceptionIfIndexIsNotBuilt();
+
Iterator<RowId> iterator = getRowIdIteratorForGetByBinaryTuple(key);
pendingCursors.incrementAndGet();
@@ -180,4 +189,12 @@
throw new StorageRebalanceException("Storage in the process of rebalancing");
}
}
+
+ private String createStorageInfo() {
+ return IgniteStringFormatter.format("indexId={}, partitionId={}", indexId, partitionId);
+ }
+
+ void throwExceptionIfIndexIsNotBuilt() {
+ StorageUtils.throwExceptionIfIndexIsNotBuilt(nextRowIdToBuild, this::createStorageInfo);
+ }
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
index 9dfa546..7b503ab 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestHashIndexStorage.java
@@ -42,11 +42,9 @@
private final StorageHashIndexDescriptor descriptor;
- /**
- * Constructor.
- */
+ /** Constructor. */
public TestHashIndexStorage(int partitionId, StorageHashIndexDescriptor descriptor) {
- super(partitionId, descriptor.isPk());
+ super(partitionId, descriptor);
this.descriptor = descriptor;
}
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index e316f79..e0dc737 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -46,21 +46,16 @@
* Test implementation of MV sorted index storage.
*/
public class TestSortedIndexStorage extends AbstractTestIndexStorage implements SortedIndexStorage {
- private final int partitionId;
-
private final NavigableSet<IndexRow> index;
private final StorageSortedIndexDescriptor descriptor;
- /**
- * Constructor.
- */
+ /** Constructor. */
public TestSortedIndexStorage(int partitionId, StorageSortedIndexDescriptor descriptor) {
- super(partitionId, descriptor.isPk());
+ super(partitionId, descriptor);
BinaryTupleComparator binaryTupleComparator = new BinaryTupleComparator(descriptor.columns());
- this.partitionId = partitionId;
this.descriptor = descriptor;
this.index = new ConcurrentSkipListSet<>(
comparing((IndexRow indexRow) -> indexRow.indexColumns().byteBuffer(), binaryTupleComparator)
@@ -99,49 +94,13 @@
}
@Override
- public PeekCursor<IndexRow> scan(
- @Nullable BinaryTuplePrefix lowerBound,
- @Nullable BinaryTuplePrefix upperBound,
- int flags
- ) {
- checkStorageClosedOrInProcessOfRebalance(true);
+ public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+ return scanInternal(lowerBound, upperBound, flags, true);
+ }
- boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
- boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
-
- if (!includeLower && lowerBound != null) {
- setEqualityFlag(lowerBound);
- }
-
- if (includeUpper && upperBound != null) {
- setEqualityFlag(upperBound);
- }
-
- NavigableSet<IndexRow> navigableSet;
-
- if (lowerBound == null && upperBound == null) {
- navigableSet = index;
- } else if (lowerBound == null) {
- navigableSet = index.headSet(prefixToIndexRow(upperBound, highestRowId(partitionId)), true);
- } else if (upperBound == null) {
- navigableSet = index.tailSet(prefixToIndexRow(lowerBound, lowestRowId(partitionId)), true);
- } else {
- try {
- navigableSet = index.subSet(
- prefixToIndexRow(lowerBound, lowestRowId(partitionId)),
- true,
- prefixToIndexRow(upperBound, highestRowId(partitionId)),
- true
- );
- } catch (IllegalArgumentException e) {
- // Upper bound is below the lower bound.
- navigableSet = emptyNavigableSet();
- }
- }
-
- pendingCursors.incrementAndGet();
-
- return new ScanCursor(navigableSet);
+ @Override
+ public PeekCursor<IndexRow> tolerantScan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+ return scanInternal(lowerBound, upperBound, flags, false);
}
private IndexRowImpl prefixToIndexRow(BinaryTuplePrefix prefix, RowId rowId) {
@@ -240,4 +199,54 @@
public Set<RowId> allRowsIds() {
return index.stream().map(IndexRow::rowId).collect(Collectors.toSet());
}
+
+ private PeekCursor<IndexRow> scanInternal(
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ int flags,
+ boolean onlyBuiltIndex
+ ) {
+ checkStorageClosedOrInProcessOfRebalance(true);
+
+ if (onlyBuiltIndex) {
+ throwExceptionIfIndexIsNotBuilt();
+ }
+
+ boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+ boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
+
+ if (!includeLower && lowerBound != null) {
+ setEqualityFlag(lowerBound);
+ }
+
+ if (includeUpper && upperBound != null) {
+ setEqualityFlag(upperBound);
+ }
+
+ NavigableSet<IndexRow> navigableSet;
+
+ if (lowerBound == null && upperBound == null) {
+ navigableSet = index;
+ } else if (lowerBound == null) {
+ navigableSet = index.headSet(prefixToIndexRow(upperBound, highestRowId(partitionId)), true);
+ } else if (upperBound == null) {
+ navigableSet = index.tailSet(prefixToIndexRow(lowerBound, lowestRowId(partitionId)), true);
+ } else {
+ try {
+ navigableSet = index.subSet(
+ prefixToIndexRow(lowerBound, lowestRowId(partitionId)),
+ true,
+ prefixToIndexRow(upperBound, highestRowId(partitionId)),
+ true
+ );
+ } catch (IllegalArgumentException e) {
+ // Upper bound is below the lower bound.
+ navigableSet = emptyNavigableSet();
+ }
+ }
+
+ pendingCursors.incrementAndGet();
+
+ return new ScanCursor(navigableSet);
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java
index 5bd1436..76b55e8 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java
@@ -455,4 +455,8 @@
return hasNext;
}
}
+
+ protected void throwExceptionIfIndexIsNotBuilt() {
+ StorageUtils.throwExceptionIfIndexIsNotBuilt(nextRowIdToBuild, this::createStorageInfo);
+ }
}
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
index 68a1618..c3fdc11 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
@@ -84,6 +84,8 @@
return busyDataRead(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfIndexIsNotBuilt();
+
IndexColumns indexColumns = new IndexColumns(partitionId, key.byteBuffer());
HashIndexRow lowerBound = new HashIndexRow(indexColumns, lowestRowId);
diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index 912da53..c79b505 100644
--- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -41,7 +41,6 @@
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.internal.util.CursorUtils;
import org.jetbrains.annotations.Nullable;
/**
@@ -91,6 +90,8 @@
return busyDataRead(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfIndexIsNotBuilt();
+
SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
return new ScanCursor<RowId>(lowerBound) {
@@ -150,33 +151,7 @@
@Override
public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
- return busyDataRead(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
-
- boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
- boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
-
- SortedIndexRowKey lower = createBound(lowerBound, !includeLower);
-
- SortedIndexRowKey upper = createBound(upperBound, includeUpper);
-
- return new ScanCursor<IndexRow>(lower) {
- private final BinaryTupleComparator comparator = localTree.getBinaryTupleComparator();
-
- @Override
- public IndexRow map(SortedIndexRow value) {
- return toIndexRowImpl(value);
- }
-
- @Override
- protected boolean exceedsUpperBound(SortedIndexRow value) {
- return upper != null && 0 <= comparator.compare(
- value.indexColumns().valueBuffer(),
- upper.indexColumns().valueBuffer()
- );
- }
- };
- });
+ return scanInternal(lowerBound, upperBound, flags, true);
}
@Override
@@ -184,6 +159,8 @@
return busyDataRead(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfIndexIsNotBuilt();
+
boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
@@ -193,13 +170,18 @@
try {
Cursor<SortedIndexRow> cursor = indexTree.find(lower, upper);
- return CursorUtils.map(cursor, this::toIndexRowImpl);
+ return new ReadOnlyScanCursor(cursor);
} catch (IgniteInternalCheckedException e) {
throw new StorageException("Couldn't get index tree cursor", e);
}
});
}
+ @Override
+ public PeekCursor<IndexRow> tolerantScan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+ return scanInternal(lowerBound, upperBound, flags, false);
+ }
+
private @Nullable SortedIndexRowKey createBound(@Nullable BinaryTuplePrefix bound, boolean setEqualityFlag) {
if (bound == null) {
return null;
@@ -247,4 +229,76 @@
indexRow.indexColumns().link(PageIdUtils.NULL_LINK);
}
}
+
+ private PeekCursor<IndexRow> scanInternal(
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ int flags,
+ boolean onlyBuiltIndex
+ ) {
+ return busyDataRead(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+ if (onlyBuiltIndex) {
+ throwExceptionIfIndexIsNotBuilt();
+ }
+
+ boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+ boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
+
+ SortedIndexRowKey lower = createBound(lowerBound, !includeLower);
+
+ SortedIndexRowKey upper = createBound(upperBound, includeUpper);
+
+ return new ScanCursor<IndexRow>(lower) {
+ private final BinaryTupleComparator comparator = localTree.getBinaryTupleComparator();
+
+ @Override
+ public IndexRow map(SortedIndexRow value) {
+ return toIndexRowImpl(value);
+ }
+
+ @Override
+ protected boolean exceedsUpperBound(SortedIndexRow value) {
+ return upper != null && 0 <= comparator.compare(
+ value.indexColumns().valueBuffer(),
+ upper.indexColumns().valueBuffer()
+ );
+ }
+ };
+ });
+ }
+
+ private class ReadOnlyScanCursor implements Cursor<IndexRow> {
+ private final Cursor<SortedIndexRow> treeCursor;
+
+ private ReadOnlyScanCursor(Cursor<SortedIndexRow> treeCursor) {
+ this.treeCursor = treeCursor;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return busyDataRead(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemorySortedIndexStorage.this::createStorageInfo);
+
+ return treeCursor.hasNext();
+ });
+ }
+
+ @Override
+ public IndexRow next() {
+ return busyDataRead(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), PageMemorySortedIndexStorage.this::createStorageInfo);
+
+ SortedIndexRow next = treeCursor.next();
+
+ return toIndexRowImpl(next);
+ });
+ }
+
+ @Override
+ public void close() {
+ treeCursor.close();
+ }
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
index 55046dc..169c381 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
@@ -45,6 +45,7 @@
import org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper;
import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
import org.apache.ignite.internal.storage.util.StorageState;
+import org.apache.ignite.internal.storage.util.StorageUtils;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
@@ -393,4 +394,8 @@
}
}
}
+
+ protected void throwExceptionIfIndexNotBuilt() {
+ StorageUtils.throwExceptionIfIndexIsNotBuilt(nextRowIdToBuild, this::createStorageInfo);
+ }
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index 179f88f..89c9ed6 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -105,6 +105,8 @@
return busyDataRead(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfIndexNotBuilt();
+
byte[] rangeStart = rocksPrefix(key);
byte[] rangeEnd = incrementPrefix(rangeStart);
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 9c6db55..d78ba99 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -112,6 +112,8 @@
return busyDataRead(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfIndexNotBuilt();
+
BinaryTuplePrefix keyPrefix = BinaryTuplePrefix.fromBinaryTuple(key);
return scan(keyPrefix, keyPrefix, true, true, this::decodeRowId);
@@ -152,14 +154,7 @@
@Override
public PeekCursor<IndexRow> scan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
- return busyDataRead(() -> {
- throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
-
- boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
- boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
-
- return scan(lowerBound, upperBound, includeLower, includeUpper, this::decodeRow);
- });
+ return scanInternal(lowerBound, upperBound, flags, true);
}
protected <T> PeekCursor<T> scan(
@@ -186,6 +181,8 @@
return busyDataRead(() -> {
throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+ throwExceptionIfIndexNotBuilt();
+
boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
@@ -254,6 +251,11 @@
});
}
+ @Override
+ public PeekCursor<IndexRow> tolerantScan(@Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, int flags) {
+ return scanInternal(lowerBound, upperBound, flags, false);
+ }
+
private byte[] getBound(@Nullable BinaryTuplePrefix bound, byte[] partitionPrefix, boolean changeBoundIncluded) {
byte[] boundBytes;
@@ -328,4 +330,24 @@
public void clearIndex(WriteBatch writeBatch) throws RocksDBException {
writeBatch.deleteRange(indexCf.handle(), partitionStartPrefix, partitionEndPrefix);
}
+
+ private PeekCursor<IndexRow> scanInternal(
+ @Nullable BinaryTuplePrefix lowerBound,
+ @Nullable BinaryTuplePrefix upperBound,
+ int flags,
+ boolean onlyBuiltIndex
+ ) {
+ return busyDataRead(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo);
+
+ if (onlyBuiltIndex) {
+ throwExceptionIfIndexNotBuilt();
+ }
+
+ boolean includeLower = (flags & GREATER_OR_EQUAL) != 0;
+ boolean includeUpper = (flags & LESS_OR_EQUAL) != 0;
+
+ return scan(lowerBound, upperBound, includeLower, includeUpper, this::decodeRow);
+ });
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
index bd51fdb..b4403bc 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
@@ -160,7 +160,7 @@
IndexRow nextRow = null;
// Find next key.
- try (Cursor<IndexRow> cursor = storage.scan(prefix, null, SortedIndexStorage.GREATER)) {
+ try (Cursor<IndexRow> cursor = storage.tolerantScan(prefix, null, SortedIndexStorage.GREATER)) {
if (cursor.hasNext()) {
nextRow = cursor.next();
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index 556f076..385f344 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -36,6 +36,7 @@
import org.apache.ignite.internal.storage.BaseMvStoragesTest;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TestStorageUtils;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
@@ -180,6 +181,8 @@
indexUpdateHandler,
storageUpdateConfiguration
);
+
+ TestStorageUtils.completeBuiltIndexes(storage, hashInnerStorage, sortedInnerStorage);
}
List<ReadResult> getRowVersions(RowId rowId) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 781b019..556a376 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -133,8 +133,10 @@
import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.TestStorageUtils;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
@@ -515,6 +517,8 @@
columnsExtractor
);
+ completeBuiltIndexes(sortedIndexStorage.storage(), hashIndexStorage.storage());
+
IndexLocker pkLocker = new HashIndexLocker(pkIndexId, true, lockManager, row2Tuple);
IndexLocker sortedIndexLocker = new SortedIndexLocker(sortedIndexId, PART_ID, lockManager, indexStorage, row2Tuple);
IndexLocker hashIndexLocker = new HashIndexLocker(hashIndexId, false, lockManager, row2Tuple);
@@ -658,6 +662,8 @@
((TestSortedIndexStorage) sortedIndexStorage.storage()).clear();
testMvPartitionStorage.clear();
pendingRows.clear();
+
+ completeBuiltIndexes(hashIndexStorage.storage(), sortedIndexStorage.storage());
}
@Test
@@ -3035,4 +3041,8 @@
return partitionReplicaListener.invoke(request, localNode.id());
}
+
+ private void completeBuiltIndexes(IndexStorage... indexStorages) {
+ TestStorageUtils.completeBuiltIndexes(testMvPartitionStorage, indexStorages);
+ }
}
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index a1834db..3aeb093 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -594,6 +594,7 @@
ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schemaDescriptor);
StorageHashIndexDescriptor pkIndexDescriptor = mock(StorageHashIndexDescriptor.class);
+ when(pkIndexDescriptor.isPk()).thenReturn(true);
when(pkIndexDescriptor.columns()).then(invocation -> Collections.nCopies(
schemaDescriptor.keyColumns().size(),
diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 131e4b6..52eea74 100644
--- a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -340,6 +340,7 @@
ColumnsExtractor row2Tuple = BinaryRowConverter.keyExtractor(schema);
StorageHashIndexDescriptor pkIndexDescriptor = mock(StorageHashIndexDescriptor.class);
+ when(pkIndexDescriptor.isPk()).thenReturn(true);
when(pkIndexDescriptor.columns()).then(
invocation -> Collections.nCopies(schema.keyColumns().size(), mock(StorageHashIndexColumnDescriptor.class))