| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.storage.index; |
| |
| import static java.util.stream.Collectors.toList; |
| import static java.util.stream.Collectors.toUnmodifiableList; |
| import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE; |
| import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName; |
| import static org.apache.ignite.internal.storage.BaseMvStoragesTest.getOrCreateMvPartition; |
| import static org.apache.ignite.internal.storage.util.StorageUtils.initialRowIdToBuild; |
| import static org.hamcrest.CoreMatchers.equalTo; |
| import static org.hamcrest.CoreMatchers.is; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.anyOf; |
| import static org.hamcrest.Matchers.contains; |
| import static org.hamcrest.Matchers.containsInAnyOrder; |
| import static org.hamcrest.Matchers.empty; |
| import static org.hamcrest.Matchers.hasSize; |
| import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; |
| import static org.junit.jupiter.api.Assertions.assertEquals; |
| import static org.junit.jupiter.api.Assertions.assertFalse; |
| import static org.junit.jupiter.api.Assertions.assertNotNull; |
| import static org.junit.jupiter.api.Assertions.assertNull; |
| import static org.junit.jupiter.api.Assertions.assertThrows; |
| import static org.junit.jupiter.api.Assertions.assertTrue; |
| import static org.mockito.ArgumentMatchers.anyInt; |
| import static org.mockito.ArgumentMatchers.anyLong; |
| import static org.mockito.ArgumentMatchers.eq; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.NoSuchElementException; |
| import java.util.Random; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.stream.Stream; |
| import org.apache.ignite.internal.catalog.CatalogService; |
| import org.apache.ignite.internal.catalog.commands.CatalogUtils; |
| import org.apache.ignite.internal.catalog.commands.ColumnParams; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor; |
| import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor; |
| import org.apache.ignite.internal.hlc.HybridClock; |
| import org.apache.ignite.internal.hlc.HybridClockImpl; |
| import org.apache.ignite.internal.schema.BinaryTuple; |
| import org.apache.ignite.internal.storage.MvPartitionStorage; |
| import org.apache.ignite.internal.storage.RowId; |
| import org.apache.ignite.internal.storage.TestStorageUtils; |
| import org.apache.ignite.internal.storage.engine.MvTableStorage; |
| import org.apache.ignite.internal.storage.index.impl.BinaryTupleRowSerializer; |
| import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; |
| import org.apache.ignite.internal.util.Cursor; |
| import org.apache.ignite.sql.ColumnType; |
| import org.jetbrains.annotations.Nullable; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| import org.junit.jupiter.params.ParameterizedTest; |
| import org.junit.jupiter.params.provider.ValueSource; |
| |
| /** |
| * Base class for index storage tests. Covers common methods, such as {@link IndexStorage#get(BinaryTuple)} or |
| * {@link IndexStorage#put(IndexRow)}. |
| * |
| * @param <S> Type of specific index implementation. |
| * @param <D> Type of index descriptor for that specific implementation. |
| */ |
| public abstract class AbstractIndexStorageTest<S extends IndexStorage, D extends StorageIndexDescriptor> extends BaseIgniteAbstractTest { |
| /** Definitions of all supported column types. */ |
| @SuppressWarnings("WeakerAccess") // May be used in "@VariableSource", that's why it's public. |
| public static final List<ColumnParams> ALL_TYPES_COLUMN_PARAMS = allTypesColumnParams(); |
| |
| protected static final int TEST_PARTITION = 12; |
| |
| protected static final String TABLE_NAME = "FOO"; |
| |
| protected static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME); |
| |
| protected static final String INDEX_NAME = "TEST_IDX"; |
| |
| private static List<ColumnParams> allTypesColumnParams() { |
| return List.of( |
| columnParamsBuilder(ColumnType.INT8).nullable(true).build(), |
| columnParamsBuilder(ColumnType.INT16).nullable(true).build(), |
| columnParamsBuilder(ColumnType.INT32).nullable(true).build(), |
| columnParamsBuilder(ColumnType.INT64).nullable(true).build(), |
| columnParamsBuilder(ColumnType.FLOAT).nullable(true).build(), |
| columnParamsBuilder(ColumnType.DOUBLE).nullable(true).build(), |
| columnParamsBuilder(ColumnType.UUID).nullable(true).build(), |
| columnParamsBuilder(ColumnType.DATE).nullable(true).build(), |
| columnParamsBuilder(ColumnType.BITMASK).length(100).nullable(true).build(), |
| columnParamsBuilder(ColumnType.STRING).length(100).nullable(true).build(), |
| columnParamsBuilder(ColumnType.BYTE_ARRAY).length(100).nullable(true).build(), |
| columnParamsBuilder(ColumnType.NUMBER).precision(10).nullable(true).build(), |
| columnParamsBuilder(ColumnType.DECIMAL).precision(19).scale(3).nullable(true).build(), |
| columnParamsBuilder(ColumnType.TIME).precision(0).nullable(true).build(), |
| columnParamsBuilder(ColumnType.DATETIME).precision(6).nullable(true).build(), |
| columnParamsBuilder(ColumnType.TIMESTAMP).precision(6).nullable(true).build() |
| ); |
| } |
| |
| private final long seed = System.currentTimeMillis(); |
| |
| protected final Random random = new Random(seed); |
| |
| protected MvTableStorage tableStorage; |
| |
| protected MvPartitionStorage partitionStorage; |
| |
| protected final AtomicInteger catalogId = new AtomicInteger(); |
| |
| protected final CatalogService catalogService = mock(CatalogService.class); |
| |
| protected final HybridClock clock = new HybridClockImpl(); |
| |
| @BeforeEach |
| void setUp() { |
| log.info("Using random seed: " + seed); |
| } |
| |
| /** |
| * Returns a name of the column with given type. |
| */ |
| protected static String columnName(ColumnType type) { |
| return type.name(); |
| } |
| |
| /** |
| * Initializes the internal structures needed for tests. |
| * |
| * <p>This method *MUST* always be called in either subclass' constructor or setUp method. |
| */ |
| protected final void initialize(MvTableStorage tableStorage) { |
| this.tableStorage = tableStorage; |
| this.partitionStorage = getOrCreateMvPartition(tableStorage, TEST_PARTITION); |
| |
| createTestTable(); |
| } |
| |
| /** Configures a test table with columns of all supported types. */ |
| private void createTestTable() { |
| ColumnParams pkColumn = ColumnParams.builder().name("pk").type(ColumnType.INT32).nullable(false).build(); |
| |
| int schemaId = catalogId.getAndIncrement(); |
| int tableId = catalogId.getAndIncrement(); |
| int zoneId = catalogId.getAndIncrement(); |
| int pkIndexId = catalogId.getAndIncrement(); |
| |
| CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor( |
| tableId, |
| schemaId, |
| pkIndexId, |
| TABLE_NAME, |
| zoneId, |
| Stream.concat(Stream.of(pkColumn), ALL_TYPES_COLUMN_PARAMS.stream()).map(CatalogUtils::fromParams).collect(toList()), |
| List.of(pkColumn.name()), |
| null, |
| DEFAULT_STORAGE_PROFILE |
| ); |
| |
| when(catalogService.table(eq(TABLE_NAME), anyLong())).thenReturn(tableDescriptor); |
| when(catalogService.table(eq(tableId), anyInt())).thenReturn(tableDescriptor); |
| |
| createCatalogIndexDescriptor(tableId, pkIndexId, PK_INDEX_NAME, pkColumn.type()); |
| } |
| |
| /** |
| * Creates an IndexStorage instance using the given columns. |
| * |
| * @param name Index name. |
| * @param built {@code True} to create a built index, {@code false} if you need to build it later. |
| * @param columnTypes Column types. |
| * @see #columnName(ColumnType) |
| * @see #completeBuildIndex(IndexStorage) |
| */ |
| protected abstract S createIndexStorage(String name, boolean built, ColumnType... columnTypes); |
| |
| /** |
| * Creates a built IndexStorage instance using the given columns. |
| * |
| * @param name Index name. |
| * @param columnTypes Column types. |
| * @see #columnName(ColumnType) |
| * @see #completeBuildIndex(IndexStorage) |
| */ |
| protected S createIndexStorage(String name, ColumnType... columnTypes) { |
| return createIndexStorage(name, true, columnTypes); |
| } |
| |
| /** |
| * Provides safe access to the index descriptor of the storage. |
| */ |
| protected abstract D indexDescriptor(S index); |
| |
| abstract CatalogIndexDescriptor createCatalogIndexDescriptor(int tableId, int indexId, String indexName, ColumnType... columnTypes); |
| |
| /** |
| * Tests the {@link IndexStorage#get} method. |
| */ |
| @Test |
| public void testGet() { |
| S index = createIndexStorage(INDEX_NAME, ColumnType.INT32, ColumnType.STRING); |
| var serializer = new BinaryTupleRowSerializer(indexDescriptor(index)); |
| |
| // First two rows have the same index key, but different row IDs. |
| IndexRow row1 = serializer.serializeRow(new Object[]{ 1, "foo" }, new RowId(TEST_PARTITION)); |
| IndexRow row2 = serializer.serializeRow(new Object[]{ 1, "foo" }, new RowId(TEST_PARTITION)); |
| IndexRow row3 = serializer.serializeRow(new Object[]{ 2, "bar" }, new RowId(TEST_PARTITION)); |
| IndexRow row4 = serializer.serializeRow(new Object[]{ 3, "baz" }, new RowId(TEST_PARTITION)); |
| |
| assertThat(getAll(index, row1), is(empty())); |
| assertThat(getAll(index, row2), is(empty())); |
| assertThat(getAll(index, row3), is(empty())); |
| |
| put(index, row1); |
| put(index, row2); |
| put(index, row3); |
| |
| assertThat(getAll(index, row1), containsInAnyOrder(row1.rowId(), row2.rowId())); |
| assertThat(getAll(index, row2), containsInAnyOrder(row1.rowId(), row2.rowId())); |
| assertThat(getAll(index, row3), contains(row3.rowId())); |
| assertThat(getAll(index, row4), is(empty())); |
| } |
| |
| @Test |
| public void testGetConcurrentPut() { |
| S index = createIndexStorage(INDEX_NAME, ColumnType.INT32, ColumnType.STRING); |
| var serializer = new BinaryTupleRowSerializer(indexDescriptor(index)); |
| |
| Object[] columnValues = { 1, "foo" }; |
| IndexRow row1 = serializer.serializeRow(columnValues, new RowId(TEST_PARTITION, 1, 1)); |
| IndexRow row2 = serializer.serializeRow(columnValues, new RowId(TEST_PARTITION, 2, 2)); |
| |
| try (Cursor<RowId> cursor = index.get(row1.indexColumns())) { |
| put(index, row1); |
| |
| assertTrue(cursor.hasNext()); |
| assertEquals(row1.rowId(), cursor.next()); |
| |
| put(index, row2); |
| |
| assertTrue(cursor.hasNext()); |
| assertEquals(row2.rowId(), cursor.next()); |
| |
| assertFalse(cursor.hasNext()); |
| assertThrows(NoSuchElementException.class, cursor::next); |
| } |
| } |
| |
| @Test |
| public void testGetConcurrentReplace() { |
| S index = createIndexStorage(INDEX_NAME, ColumnType.INT32, ColumnType.STRING); |
| var serializer = new BinaryTupleRowSerializer(indexDescriptor(index)); |
| |
| Object[] columnValues = { 1, "foo" }; |
| IndexRow row1 = serializer.serializeRow(columnValues, new RowId(TEST_PARTITION, 1, 1)); |
| IndexRow row2 = serializer.serializeRow(columnValues, new RowId(TEST_PARTITION, 2, 2)); |
| |
| put(index, row1); |
| |
| try (Cursor<RowId> cursor = index.get(row1.indexColumns())) { |
| assertTrue(cursor.hasNext()); |
| assertEquals(row1.rowId(), cursor.next()); |
| |
| remove(index, row1); |
| put(index, row2); |
| |
| assertTrue(cursor.hasNext()); |
| assertEquals(row2.rowId(), cursor.next()); |
| |
| assertFalse(cursor.hasNext()); |
| assertThrows(NoSuchElementException.class, cursor::next); |
| } |
| } |
| |
| /** |
| * Tests that {@link IndexStorage#put} does not create row ID duplicates. |
| */ |
| @Test |
| public void testPutIdempotence() { |
| S index = createIndexStorage(INDEX_NAME, ColumnType.INT32, ColumnType.STRING); |
| var serializer = new BinaryTupleRowSerializer(indexDescriptor(index)); |
| |
| IndexRow row = serializer.serializeRow(new Object[]{ 1, "foo" }, new RowId(TEST_PARTITION)); |
| |
| put(index, row); |
| put(index, row); |
| |
| IndexRow actualRow = getSingle(index, row.indexColumns()); |
| |
| assertNotNull(actualRow); |
| |
| assertThat(actualRow.rowId(), is(equalTo(row.rowId()))); |
| |
| assertThat(getAll(index, row), contains(row.rowId())); |
| } |
| |
| /** |
| * Tests the {@link IndexStorage#remove} method. |
| */ |
| @Test |
| public void testRemove() { |
| S index = createIndexStorage(INDEX_NAME, ColumnType.INT32, ColumnType.STRING); |
| var serializer = new BinaryTupleRowSerializer(indexDescriptor(index)); |
| |
| IndexRow row1 = serializer.serializeRow(new Object[]{ 1, "foo" }, new RowId(TEST_PARTITION)); |
| IndexRow row2 = serializer.serializeRow(new Object[]{ 1, "foo" }, new RowId(TEST_PARTITION)); |
| IndexRow row3 = serializer.serializeRow(new Object[]{ 2, "bar" }, new RowId(TEST_PARTITION)); |
| |
| put(index, row1); |
| put(index, row2); |
| put(index, row3); |
| |
| assertThat(getAll(index, row1), containsInAnyOrder(row1.rowId(), row2.rowId())); |
| assertThat(getAll(index, row2), containsInAnyOrder(row1.rowId(), row2.rowId())); |
| assertThat(getAll(index, row3), contains(row3.rowId())); |
| |
| remove(index, row1); |
| |
| assertThat(getAll(index, row1), contains(row2.rowId())); |
| assertThat(getAll(index, row2), contains(row2.rowId())); |
| assertThat(getAll(index, row3), contains(row3.rowId())); |
| |
| remove(index, row2); |
| |
| assertThat(getAll(index, row1), is(empty())); |
| assertThat(getAll(index, row2), is(empty())); |
| assertThat(getAll(index, row3), contains(row3.rowId())); |
| |
| remove(index, row3); |
| |
| assertThat(getAll(index, row1), is(empty())); |
| assertThat(getAll(index, row2), is(empty())); |
| assertThat(getAll(index, row3), is(empty())); |
| } |
| |
| /** |
| * Tests that {@link IndexStorage#remove} works normally when removing a non-existent row. |
| */ |
| @Test |
| public void testRemoveIdempotence() { |
| S index = createIndexStorage(INDEX_NAME, ColumnType.INT32, ColumnType.STRING); |
| var serializer = new BinaryTupleRowSerializer(indexDescriptor(index)); |
| |
| IndexRow row = serializer.serializeRow(new Object[]{ 1, "foo" }, new RowId(TEST_PARTITION)); |
| |
| assertDoesNotThrow(() -> remove(index, row)); |
| |
| put(index, row); |
| |
| remove(index, row); |
| |
| assertThat(getAll(index, row), is(empty())); |
| |
| assertDoesNotThrow(() -> remove(index, row)); |
| } |
| |
| @ParameterizedTest |
| @ValueSource(booleans = {true, false}) |
| void testNextRowIdToBuild(boolean pk) { |
| IndexStorage indexStorage; |
| |
| if (pk) { |
| indexStorage = createPkIndexStorage(); |
| |
| assertNull(indexStorage.getNextRowIdToBuild()); |
| } else { |
| indexStorage = createIndexStorage(INDEX_NAME, false, ColumnType.INT32); |
| |
| assertEquals(initialRowIdToBuild(TEST_PARTITION), indexStorage.getNextRowIdToBuild()); |
| } |
| |
| var newNextRowIdToBuild = new RowId(TEST_PARTITION); |
| |
| partitionStorage.runConsistently(locker -> { |
| indexStorage.setNextRowIdToBuild(newNextRowIdToBuild); |
| |
| return null; |
| }); |
| |
| assertEquals(newNextRowIdToBuild, newNextRowIdToBuild); |
| } |
| |
| @Test |
| void testGetFromPkIndex() { |
| S pkIndex = createPkIndexStorage(); |
| var serializer = new BinaryTupleRowSerializer(indexDescriptor(pkIndex)); |
| |
| IndexRow indexRow = createIndexRow(serializer, 1); |
| |
| assertDoesNotThrow(() -> getAll(pkIndex, indexRow)); |
| } |
| |
| @Test |
| void testGetAfterBuiltIndex() { |
| S index = createIndexStorage(INDEX_NAME, false, ColumnType.INT32); |
| var serializer = new BinaryTupleRowSerializer(indexDescriptor(index)); |
| |
| IndexRow indexRow = createIndexRow(serializer, 1); |
| |
| assertThrows(IndexNotBuiltException.class, () -> getAll(index, indexRow)); |
| |
| completeBuildIndex(index); |
| |
| assertDoesNotThrow(() -> getAll(index, indexRow)); |
| } |
| |
| protected static Collection<RowId> getAll(IndexStorage index, IndexRow row) { |
| try (Cursor<RowId> cursor = index.get(row.indexColumns())) { |
| return cursor.stream().collect(toList()); |
| } |
| } |
| |
| /** |
| * Extracts a single value by a given key or {@code null} if it does not exist. |
| */ |
| protected static @Nullable IndexRow getSingle(IndexStorage indexStorage, BinaryTuple fullPrefix) { |
| List<RowId> rowIds = get(indexStorage, fullPrefix); |
| |
| assertThat(rowIds, anyOf(empty(), hasSize(1))); |
| |
| return rowIds.isEmpty() ? null : new IndexRowImpl(fullPrefix, rowIds.get(0)); |
| } |
| |
| protected static List<RowId> get(IndexStorage index, BinaryTuple key) { |
| try (Cursor<RowId> cursor = index.get(key)) { |
| return cursor.stream().collect(toUnmodifiableList()); |
| } |
| } |
| |
| protected final void put(S indexStorage, IndexRow row) { |
| partitionStorage.runConsistently(locker -> { |
| locker.lock(row.rowId()); |
| |
| indexStorage.put(row); |
| |
| return null; |
| }); |
| } |
| |
| protected final void remove(S indexStorage, IndexRow row) { |
| partitionStorage.runConsistently(locker -> { |
| locker.lock(row.rowId()); |
| |
| indexStorage.remove(row); |
| |
| return null; |
| }); |
| } |
| |
| private static ColumnParams.Builder columnParamsBuilder(ColumnType columnType) { |
| return ColumnParams.builder().name(columnName(columnType)).type(columnType); |
| } |
| |
| void addToCatalog(CatalogIndexDescriptor indexDescriptor) { |
| when(catalogService.aliveIndex(eq(indexDescriptor.name()), anyLong())).thenReturn(indexDescriptor); |
| when(catalogService.index(eq(indexDescriptor.id()), anyInt())).thenReturn(indexDescriptor); |
| } |
| |
| S createPkIndexStorage() { |
| CatalogTableDescriptor tableDescriptor = catalogService.table(TABLE_NAME, clock.nowLong()); |
| |
| CatalogIndexDescriptor pkIndexDescriptor = catalogService.aliveIndex(PK_INDEX_NAME, clock.nowLong()); |
| |
| return (S) tableStorage.getOrCreateIndex( |
| TEST_PARTITION, |
| StorageIndexDescriptor.create(tableDescriptor, pkIndexDescriptor) |
| ); |
| } |
| |
| /** Completes the building of the index and makes read operations available from it. */ |
| void completeBuildIndex(IndexStorage indexStorage) { |
| TestStorageUtils.completeBuiltIndexes(partitionStorage, indexStorage); |
| } |
| |
| static IndexRow createIndexRow(BinaryTupleRowSerializer serializer, Object... values) { |
| return serializer.serializeRow(values, new RowId(TEST_PARTITION)); |
| } |
| } |