blob: 7b661792838449c60dc56888460d07e54368115e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.pkIndexName;
import static org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.ASC_NULLS_LAST;
import static org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus.AVAILABLE;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
import static org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
import static org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER;
import static org.apache.ignite.internal.storage.util.StorageUtils.initialRowIdToBuild;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.apache.ignite.sql.ColumnType.STRING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSortedIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.IndexStorage;
import org.apache.ignite.internal.storage.index.PeekCursor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.impl.BinaryTupleRowSerializer;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
/**
* Abstract class that contains tests for {@link MvTableStorage} implementations.
*/
@SuppressWarnings("JUnitTestMethodInProductSource")
public abstract class AbstractMvTableStorageTest extends BaseMvStoragesTest {
private static final String TABLE_NAME = "FOO";
private static final String PK_INDEX_NAME = pkIndexName(TABLE_NAME);
private static final String SORTED_INDEX_NAME = "SORTED_IDX";
private static final String HASH_INDEX_NAME = "HASH_IDX";
protected static final int PARTITION_ID = 0;
private static final RowId INITIAL_ROW_ID_TO_BUILD = initialRowIdToBuild(PARTITION_ID);
/** Partition id for 0 storage. */
protected static final int PARTITION_ID_0 = 10;
/** Partition id for 1 storage. */
protected static final int PARTITION_ID_1 = 9;
protected static final int COMMIT_TABLE_ID = 999;
protected MvTableStorage tableStorage;
protected StorageSortedIndexDescriptor sortedIdx;
protected StorageHashIndexDescriptor hashIdx;
protected StorageIndexDescriptor pkIdx;
private final CatalogService catalogService = mock(CatalogService.class);
protected final StorageIndexDescriptorSupplier indexDescriptorSupplier = new StorageIndexDescriptorSupplier() {
@Override
public @Nullable StorageIndexDescriptor get(int indexId) {
int catalogVersion = catalogService.latestCatalogVersion();
CatalogIndexDescriptor indexDescriptor = catalogService.index(indexId, catalogVersion);
if (indexDescriptor == null) {
return null;
}
CatalogTableDescriptor tableDescriptor = catalogService.table(indexDescriptor.tableId(), catalogVersion);
assertThat(tableDescriptor, is(notNullValue()));
return StorageIndexDescriptor.create(tableDescriptor, indexDescriptor);
}
};
private class TestRow {
final RowId rowId;
final BinaryRow row;
final HybridTimestamp timestamp;
TestRow(RowId rowId, BinaryRow row) {
this.rowId = rowId;
this.row = row;
this.timestamp = clock.now();
}
}
@AfterEach
protected void tearDown() throws Exception {
if (tableStorage != null) {
tableStorage.close();
}
}
protected abstract MvTableStorage createMvTableStorage();
/**
* Tests that {@link MvTableStorage#getMvPartition(int)} correctly returns an existing partition.
*/
@Test
void testCreatePartition() {
assertThrows(IllegalArgumentException.class, () -> tableStorage.createMvPartition(getPartitionIdOutOfRange()));
MvPartitionStorage absentStorage = tableStorage.getMvPartition(0);
assertThat(absentStorage, is(nullValue()));
MvPartitionStorage partitionStorage = getOrCreateMvPartition(0);
assertThat(partitionStorage, is(notNullValue()));
assertThat(partitionStorage, is(sameInstance(tableStorage.getMvPartition(0))));
StorageException exception = assertThrows(StorageException.class, () -> tableStorage.createMvPartition(0));
assertThat(exception.getMessage(), containsString("Storage already exists"));
}
/**
* Tests that partition data does not overlap.
*/
@Test
void testPartitionIndependence() {
MvPartitionStorage partitionStorage0 = getOrCreateMvPartition(PARTITION_ID_0);
// Using a shifted ID value to test a multibyte scenario.
MvPartitionStorage partitionStorage1 = getOrCreateMvPartition(PARTITION_ID_1);
var testData0 = binaryRow(new TestKey(1, "0"), new TestValue(10, "10"));
UUID txId = UUID.randomUUID();
RowId rowId0 = new RowId(PARTITION_ID_0);
partitionStorage0.runConsistently(locker -> {
locker.lock(rowId0);
return partitionStorage0.addWrite(rowId0, testData0, txId, COMMIT_TABLE_ID, 0);
});
assertThat(unwrap(partitionStorage0.read(rowId0, HybridTimestamp.MAX_VALUE)), is(equalTo(unwrap(testData0))));
assertThrows(IllegalArgumentException.class, () -> partitionStorage1.read(rowId0, HybridTimestamp.MAX_VALUE));
var testData1 = binaryRow(new TestKey(2, "2"), new TestValue(20, "20"));
RowId rowId1 = new RowId(PARTITION_ID_1);
partitionStorage1.runConsistently(locker -> {
locker.lock(rowId1);
return partitionStorage1.addWrite(rowId1, testData1, txId, COMMIT_TABLE_ID, 0);
});
assertThrows(IllegalArgumentException.class, () -> partitionStorage0.read(rowId1, HybridTimestamp.MAX_VALUE));
assertThat(unwrap(partitionStorage1.read(rowId1, HybridTimestamp.MAX_VALUE)), is(equalTo(unwrap(testData1))));
assertThat(drainToList(partitionStorage0.scan(HybridTimestamp.MAX_VALUE)), contains(unwrap(testData0)));
assertThat(drainToList(partitionStorage1.scan(HybridTimestamp.MAX_VALUE)), contains(unwrap(testData1)));
}
/**
* Tests the {@link MvTableStorage#getOrCreateIndex} method.
*/
@Test
public void testCreateIndex() {
assertThrows(StorageException.class, () -> tableStorage.getOrCreateIndex(PARTITION_ID, sortedIdx));
assertThrows(StorageException.class, () -> tableStorage.getOrCreateIndex(PARTITION_ID, hashIdx));
// Index should only be available after the associated partition has been created.
tableStorage.createMvPartition(PARTITION_ID);
assertThat(tableStorage.getOrCreateIndex(PARTITION_ID, sortedIdx), is(instanceOf(SortedIndexStorage.class)));
assertThat(tableStorage.getOrCreateIndex(PARTITION_ID, hashIdx), is(instanceOf(HashIndexStorage.class)));
assertThrows(StorageException.class, () -> tableStorage.getOrCreateIndex(PARTITION_ID, mock(StorageIndexDescriptor.class)));
}
/**
* Test creating a Sorted Index.
*/
@Test
public void testCreateSortedIndex() {
assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx));
// Index should only be available after the associated partition has been created.
tableStorage.createMvPartition(PARTITION_ID);
assertThat(tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx), is(notNullValue()));
}
/**
* Test creating a Hash Index.
*/
@Test
public void testCreateHashIndex() {
assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx));
// Index should only be available after the associated partition has been created.
tableStorage.createMvPartition(PARTITION_ID);
assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx), is(notNullValue()));
}
/**
* Tests destroying an index.
*/
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testDestroyIndex(boolean waitForDestroyFuture) throws Exception {
MvPartitionStorage partitionStorage = getOrCreateMvPartition(PARTITION_ID);
SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
assertThat(sortedIndexStorage, is(notNullValue()));
HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
assertThat(hashIndexStorage, is(notNullValue()));
CompletableFuture<Void> destroySortedIndexFuture = tableStorage.destroyIndex(sortedIdx.id());
CompletableFuture<Void> destroyHashIndexFuture = tableStorage.destroyIndex(hashIdx.id());
Runnable waitForDestroy = () -> {
assertThat(partitionStorage.flush(), willCompleteSuccessfully());
assertThat(destroySortedIndexFuture, willCompleteSuccessfully());
assertThat(destroyHashIndexFuture, willCompleteSuccessfully());
};
if (waitForDestroyFuture) {
waitForDestroy.run();
}
checkStorageDestroyed(sortedIndexStorage);
checkStorageDestroyed(hashIndexStorage);
// Make sure the destroy finishes before we recreate the storage.
if (!waitForDestroyFuture) {
waitForDestroy.run();
}
tableStorage.close();
tableStorage = createMvTableStorage();
getOrCreateMvPartition(PARTITION_ID);
assertThat(tableStorage.getIndex(PARTITION_ID, sortedIdx.id()), is(nullValue()));
assertThat(tableStorage.getIndex(PARTITION_ID, hashIdx.id()), is(nullValue()));
}
/**
* Tests that an attempt to destroy an index in a table storage that is already destroyed does not
* cause an exception.
*/
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void indexDestructionDoesNotFailIfTableStorageIsDestroyed(boolean waitForDestroyFuture) throws Exception {
MvPartitionStorage partitionStorage = getOrCreateMvPartition(PARTITION_ID);
SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
assertThat(sortedIndexStorage, is(notNullValue()));
HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
assertThat(hashIndexStorage, is(notNullValue()));
assertThat(partitionStorage.flush(), willCompleteSuccessfully());
CompletableFuture<Void> destroyTableStorageFuture = tableStorage.destroy();
if (waitForDestroyFuture) {
assertThat(destroyTableStorageFuture, willCompleteSuccessfully());
}
assertDoesNotThrow(() -> tableStorage.destroyIndex(sortedIdx.id()).get(10, SECONDS));
assertDoesNotThrow(() -> tableStorage.destroyIndex(hashIdx.id()).get(10, SECONDS));
}
/**
* Tests that removing one Sorted Index does not affect the data in the other.
*/
@Test
public void testDestroySortedIndexIndependence() {
CatalogTableDescriptor catalogTableDescriptor = catalogService.table(TABLE_NAME, clock.nowLong());
var catalogSortedIndex1 = new CatalogSortedIndexDescriptor(
200,
"TEST_INDEX_1",
catalogTableDescriptor.id(),
false,
AVAILABLE,
catalogService.latestCatalogVersion(),
List.of(new CatalogIndexColumnDescriptor("STRKEY", ASC_NULLS_LAST))
);
var catalogSortedIndex2 = new CatalogSortedIndexDescriptor(
201,
"TEST_INDEX_2",
catalogTableDescriptor.id(),
false,
AVAILABLE,
catalogService.latestCatalogVersion(),
List.of(new CatalogIndexColumnDescriptor("STRKEY", ASC_NULLS_LAST))
);
var sortedIndexDescriptor1 = new StorageSortedIndexDescriptor(catalogTableDescriptor, catalogSortedIndex1);
var sortedIndexDescriptor2 = new StorageSortedIndexDescriptor(catalogTableDescriptor, catalogSortedIndex2);
MvPartitionStorage partitionStorage = getOrCreateMvPartition(PARTITION_ID);
SortedIndexStorage sortedIndexStorage1 = getOrCreateIndex(PARTITION_ID, sortedIndexDescriptor1);
SortedIndexStorage sortedIndexStorage2 = getOrCreateIndex(PARTITION_ID, sortedIndexDescriptor2);
List<TestRow> rows = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, "1"), new TestValue(1, "1")))
);
fillStorages(partitionStorage, null, sortedIndexStorage1, rows);
fillStorages(partitionStorage, null, sortedIndexStorage2, rows);
checkForPresenceRows(null, null, sortedIndexStorage1, rows);
checkForPresenceRows(null, null, sortedIndexStorage2, rows);
assertThat(tableStorage.destroyIndex(sortedIndexDescriptor1.id()), willCompleteSuccessfully());
assertThat(tableStorage.getIndex(PARTITION_ID, sortedIndexDescriptor1.id()), is(nullValue()));
checkForPresenceRows(null, null, sortedIndexStorage2, rows);
}
/**
* Tests that removing one Hash Index does not affect the data in the other.
*/
@Test
public void testDestroyHashIndexIndependence() {
CatalogTableDescriptor catalogTableDescriptor = catalogService.table(TABLE_NAME, clock.nowLong());
var catalogHashIndex1 = new CatalogHashIndexDescriptor(
200,
"TEST_INDEX_1",
catalogTableDescriptor.id(),
true,
AVAILABLE,
catalogService.latestCatalogVersion(),
List.of("STRKEY")
);
var catalogHashIndex2 = new CatalogHashIndexDescriptor(
201,
"TEST_INDEX_2",
catalogTableDescriptor.id(),
true,
AVAILABLE,
catalogService.latestCatalogVersion(),
List.of("STRKEY")
);
var hashIndexDescriptor1 = new StorageHashIndexDescriptor(catalogTableDescriptor, catalogHashIndex1);
var hashIndexDescriptor2 = new StorageHashIndexDescriptor(catalogTableDescriptor, catalogHashIndex2);
MvPartitionStorage partitionStorage = getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage1 = getOrCreateIndex(PARTITION_ID, hashIndexDescriptor1);
HashIndexStorage hashIndexStorage2 = getOrCreateIndex(PARTITION_ID, hashIndexDescriptor2);
List<TestRow> rows = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, "1"), new TestValue(1, "1")))
);
fillStorages(partitionStorage, hashIndexStorage1, null, rows);
fillStorages(partitionStorage, hashIndexStorage2, null, rows);
checkForPresenceRows(null, hashIndexStorage1, null, rows);
checkForPresenceRows(null, hashIndexStorage2, null, rows);
assertThat(tableStorage.destroyIndex(hashIndexDescriptor1.id()), willCompleteSuccessfully());
assertThat(tableStorage.getIndex(PARTITION_ID, hashIndexDescriptor1.id()), is(nullValue()));
checkForPresenceRows(null, hashIndexStorage2, null, rows);
}
@Test
public void testHashIndexIndependence() {
MvPartitionStorage partitionStorage1 = getOrCreateMvPartition(PARTITION_ID);
assertThat(tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx), is(notNullValue()));
assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID + 1, hashIdx));
MvPartitionStorage partitionStorage2 = getOrCreateMvPartition(PARTITION_ID + 1);
HashIndexStorage storage1 = getOrCreateIndex(PARTITION_ID, hashIdx);
HashIndexStorage storage2 = getOrCreateIndex(PARTITION_ID + 1, hashIdx);
assertThat(storage1, is(notNullValue()));
assertThat(storage2, is(notNullValue()));
var rowId1 = new RowId(PARTITION_ID);
var rowId2 = new RowId(PARTITION_ID + 1);
BinaryTupleSchema schema = BinaryTupleSchema.create(new Element[]{
new Element(NativeTypes.INT32, false),
new Element(NativeTypes.INT32, false)
});
ByteBuffer buffer = new BinaryTupleBuilder(schema.elementCount())
.appendInt(1)
.appendInt(2)
.build();
BinaryTuple tuple = new BinaryTuple(schema.elementCount(), buffer);
partitionStorage1.runConsistently(locker -> {
storage1.put(new IndexRowImpl(tuple, rowId1));
return null;
});
partitionStorage2.runConsistently(locker -> {
storage2.put(new IndexRowImpl(tuple, rowId2));
return null;
});
assertThat(getAll(storage1.get(tuple)), contains(rowId1));
assertThat(getAll(storage2.get(tuple)), contains(rowId2));
}
@Test
public void testPutIndexAndVersionRowToMemory() {
MvPartitionStorage partitionStorage = getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage indexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
// Should be large enough to exceed inline size.
byte[] bytes = new byte[100];
new Random().nextBytes(bytes);
String str = new String(bytes);
BinaryRow binaryRow = binaryRow(new TestKey(10, "foo"), new TestValue(20, str));
var serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor());
IndexRow indexRow = serializer.serializeRow(new Object[]{str}, new RowId(PARTITION_ID));
partitionStorage.runConsistently(locker -> {
indexStorage.put(indexRow);
return null;
});
partitionStorage.runConsistently(locker -> {
RowId rowId = new RowId(PARTITION_ID);
locker.tryLock(rowId);
partitionStorage.addWrite(rowId, binaryRow, UUID.randomUUID(), COMMIT_TABLE_ID, PARTITION_ID);
return null;
});
}
@Test
public void testPutIndexAndVersionRowToMemoryFragmented() {
MvPartitionStorage partitionStorage = getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage indexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
// Should be large enough to exceed memory page size.
byte[] bytes = new byte[16385];
new Random().nextBytes(bytes);
String str = new String(bytes);
BinaryRow binaryRow = binaryRow(new TestKey(10, "foo"), new TestValue(20, str));
var serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor());
IndexRow indexRow = serializer.serializeRow(new Object[]{str}, new RowId(PARTITION_ID));
partitionStorage.runConsistently(locker -> {
indexStorage.put(indexRow);
return null;
});
partitionStorage.runConsistently(locker -> {
RowId rowId = new RowId(PARTITION_ID);
locker.tryLock(rowId);
partitionStorage.addWrite(rowId, binaryRow, UUID.randomUUID(), COMMIT_TABLE_ID, PARTITION_ID);
return null;
});
}
private static void checkStorageDestroyed(IndexStorage storage) {
assertThrows(StorageDestroyedException.class, () -> storage.get(mock(BinaryTuple.class)));
assertThrows(StorageDestroyedException.class, () -> storage.put(mock(IndexRow.class)));
assertThrows(StorageDestroyedException.class, () -> storage.remove(mock(IndexRow.class)));
}
@SuppressWarnings("resource")
private static void checkStorageDestroyed(SortedIndexStorage storage) {
checkStorageDestroyed((IndexStorage) storage);
assertThrows(StorageDestroyedException.class, () -> storage.scan(null, null, GREATER));
assertThrows(StorageDestroyedException.class, () -> storage.readOnlyScan(null, null, GREATER));
assertThrows(StorageDestroyedException.class, () -> storage.tolerantScan(null, null, GREATER));
}
@SuppressWarnings({"resource", "deprecation"})
private void checkStorageDestroyed(MvPartitionStorage storage) {
int partId = PARTITION_ID;
assertThrows(StorageDestroyedException.class, () -> storage.runConsistently(locker -> null));
assertThrows(StorageDestroyedException.class, storage::flush);
assertThrows(StorageDestroyedException.class, storage::lastAppliedIndex);
assertThrows(StorageDestroyedException.class, storage::lastAppliedTerm);
assertThrows(StorageDestroyedException.class, storage::committedGroupConfiguration);
RowId rowId = new RowId(partId);
HybridTimestamp timestamp = clock.now();
assertThrows(StorageDestroyedException.class, () -> storage.read(new RowId(PARTITION_ID), timestamp));
BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
assertThrows(StorageDestroyedException.class, () -> storage.addWrite(rowId, binaryRow, UUID.randomUUID(), COMMIT_TABLE_ID, partId));
assertThrows(StorageDestroyedException.class, () -> storage.commitWrite(rowId, timestamp));
assertThrows(StorageDestroyedException.class, () -> storage.abortWrite(rowId));
assertThrows(StorageDestroyedException.class, () -> storage.addWriteCommitted(rowId, binaryRow, timestamp));
assertThrows(StorageDestroyedException.class, () -> storage.scan(timestamp));
assertThrows(StorageDestroyedException.class, () -> storage.scanVersions(rowId));
assertThrows(StorageDestroyedException.class, () -> storage.scanVersions(rowId));
assertThrows(StorageDestroyedException.class, () -> storage.closestRowId(rowId));
assertThrows(StorageDestroyedException.class, storage::rowsCount);
}
@Test
public void testReCreatePartition() throws Exception {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
RowId rowId = new RowId(PARTITION_ID);
BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
mvPartitionStorage.runConsistently(locker -> {
locker.lock(rowId);
mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
return null;
});
tableStorage.destroyPartition(PARTITION_ID).get(1, SECONDS);
MvPartitionStorage newMvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
List<ReadResult> allVersions = newMvPartitionStorage.runConsistently(locker -> {
locker.lock(rowId);
return getAll(newMvPartitionStorage.scanVersions(rowId));
});
assertThat(allVersions, empty());
}
@Test
public void testSuccessRebalance() {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
// Error because rebalance has not yet started for the partition.
assertThrows(
StorageRebalanceException.class,
() -> tableStorage.finishRebalancePartition(PARTITION_ID, 100, 500, BYTE_EMPTY_ARRAY)
);
List<TestRow> rowsBeforeRebalanceStart = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, "1"), new TestValue(1, "1")))
);
startRebalanceWithChecks(
PARTITION_ID,
mvPartitionStorage,
hashIndexStorage,
sortedIndexStorage,
rowsBeforeRebalanceStart
);
// Let's fill the storages with fresh data on rebalance.
List<TestRow> rowsOnRebalance = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(2, "2"), new TestValue(2, "2"))),
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(3, "3"), new TestValue(3, "3")))
);
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance);
checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
assertNull(mvPartitionStorage.committedGroupConfiguration());
// Let's finish rebalancing.
// Partition is out of configuration range.
assertThrows(
IllegalArgumentException.class,
() -> tableStorage.finishRebalancePartition(getPartitionIdOutOfRange(), 100, 500, BYTE_EMPTY_ARRAY)
);
// Partition does not exist.
assertThrows(
StorageRebalanceException.class,
() -> tableStorage.finishRebalancePartition(1, 100, 500, BYTE_EMPTY_ARRAY)
);
byte[] raftGroupConfig = createRandomRaftGroupConfiguration();
assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 10, 20, raftGroupConfig), willCompleteSuccessfully());
completeBuiltIndexes(PARTITION_ID, hashIndexStorage, sortedIndexStorage);
// Let's check the storages after success finish rebalance.
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsBeforeRebalanceStart);
checkForPresenceRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance);
checkLastApplied(mvPartitionStorage, 10, 20);
checkRaftGroupConfigs(raftGroupConfig, mvPartitionStorage.committedGroupConfiguration());
}
@Test
public void testFailRebalance() throws Exception {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
// Nothing will happen because rebalancing has not started.
tableStorage.abortRebalancePartition(PARTITION_ID).get(1, SECONDS);
List<TestRow> rowsBeforeRebalanceStart = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, "1"), new TestValue(1, "1")))
);
startRebalanceWithChecks(
PARTITION_ID,
mvPartitionStorage,
hashIndexStorage,
sortedIndexStorage,
rowsBeforeRebalanceStart
);
// Let's fill the storages with fresh data on rebalance.
List<TestRow> rowsOnRebalance = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(2, "2"), new TestValue(2, "2"))),
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(3, "3"), new TestValue(3, "3")))
);
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance);
checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
// Let's abort rebalancing.
// Partition is out of configuration range.
assertThrows(IllegalArgumentException.class, () -> tableStorage.abortRebalancePartition(getPartitionIdOutOfRange()));
assertThat(tableStorage.abortRebalancePartition(PARTITION_ID), willCompleteSuccessfully());
completeBuiltIndexes(PARTITION_ID, hashIndexStorage, sortedIndexStorage);
// Let's check the storages after abort rebalance.
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsBeforeRebalanceStart);
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsOnRebalance);
checkLastApplied(mvPartitionStorage, 0, 0);
assertNull(mvPartitionStorage.committedGroupConfiguration());
}
@Test
public void testStartRebalanceForClosedPartition() {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
mvPartitionStorage.close();
assertThat(tableStorage.startRebalancePartition(PARTITION_ID), willThrowFast(StorageRebalanceException.class));
}
@SuppressWarnings("resource")
private static void checkSortedIndexStorageMethodsAfterStartRebalance(SortedIndexStorage storage) {
assertDoesNotThrow(storage::indexDescriptor);
assertThrows(StorageRebalanceException.class, () -> storage.get(mock(BinaryTuple.class)));
assertThrows(StorageRebalanceException.class, () -> storage.remove(mock(IndexRow.class)));
assertThrows(StorageRebalanceException.class, () -> storage.scan(null, null, GREATER));
assertThrows(StorageRebalanceException.class, () -> storage.readOnlyScan(null, null, GREATER));
assertThrows(StorageRebalanceException.class, () -> storage.tolerantScan(null, null, GREATER));
}
@Test
public void testRestartStoragesInTheMiddleOfRebalance() throws Exception {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
List<TestRow> rows = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, "1"), new TestValue(1, "1")))
);
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
// Since it is not possible to close storages in middle of rebalance, we will shorten path a bit by updating only lastApplied*.
MvPartitionStorage finalMvPartitionStorage = mvPartitionStorage;
mvPartitionStorage.runConsistently(locker -> {
finalMvPartitionStorage.lastApplied(REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
return null;
});
assertThat(mvPartitionStorage.flush(), willCompleteSuccessfully());
// Restart storages.
tableStorage.close();
tableStorage = createMvTableStorage();
mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
if (tableStorage.isVolatile()) {
// Let's check the repositories: they should be empty.
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
checkLastApplied(mvPartitionStorage, 0, 0);
} else {
checkForPresenceRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
checkLastApplied(mvPartitionStorage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
}
}
@Test
void testClear() {
assertThrows(IllegalArgumentException.class, () -> tableStorage.clearPartition(getPartitionIdOutOfRange()));
// Let's check that there will be an error for a non-existent partition.
assertThrows(StorageException.class, () -> tableStorage.clearPartition(PARTITION_ID));
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
// Let's check the cleanup for an empty partition.
assertThat(tableStorage.clearPartition(PARTITION_ID), willCompleteSuccessfully());
checkLastApplied(mvPartitionStorage, 0, 0);
assertNull(mvPartitionStorage.committedGroupConfiguration());
// Let's fill the storages and clean them.
List<TestRow> rows = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, "1"), new TestValue(1, "1")))
);
byte[] raftGroupConfig = createRandomRaftGroupConfiguration();
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
mvPartitionStorage.runConsistently(locker -> {
mvPartitionStorage.lastApplied(100, 500);
mvPartitionStorage.committedGroupConfiguration(raftGroupConfig);
return null;
});
// Let's clear the storages and check them out.
assertThat(tableStorage.clearPartition(PARTITION_ID), willCompleteSuccessfully());
checkLastApplied(mvPartitionStorage, 0, 0);
assertNull(mvPartitionStorage.committedGroupConfiguration());
completeBuiltIndexes(PARTITION_ID, hashIndexStorage, sortedIndexStorage);
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
}
@Test
void testClearForCanceledOrRebalancedPartition() {
MvPartitionStorage mvPartitionStorage0 = getOrCreateMvPartition(PARTITION_ID);
getOrCreateMvPartition(PARTITION_ID + 1);
mvPartitionStorage0.close();
assertThat(tableStorage.startRebalancePartition(PARTITION_ID + 1), willCompleteSuccessfully());
try {
assertThat(tableStorage.clearPartition(PARTITION_ID), willThrowFast(StorageClosedException.class));
assertThat(tableStorage.clearPartition(PARTITION_ID + 1), willThrowFast(StorageRebalanceException.class));
} finally {
assertThat(tableStorage.abortRebalancePartition(PARTITION_ID + 1), willCompleteSuccessfully());
}
}
@Test
void testCloseStartedRebalance() {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
assertThat(tableStorage.startRebalancePartition(PARTITION_ID), willCompleteSuccessfully());
assertDoesNotThrow(mvPartitionStorage::close);
}
@Test
void testDestroyStartedRebalance() {
getOrCreateMvPartition(PARTITION_ID);
assertThat(tableStorage.startRebalancePartition(PARTITION_ID), willCompleteSuccessfully());
assertThat(tableStorage.destroyPartition(PARTITION_ID), willCompleteSuccessfully());
}
@Test
void testNextRowIdToBuildAfterRestart() throws Exception {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
IndexStorage hashIndexStorage = tableStorage.getOrCreateIndex(PARTITION_ID, hashIdx);
IndexStorage sortedIndexStorage = tableStorage.getOrCreateIndex(PARTITION_ID, sortedIdx);
IndexStorage pkIndexStorage = tableStorage.getOrCreateIndex(PARTITION_ID, pkIdx);
RowId rowId0 = new RowId(PARTITION_ID);
RowId rowId1 = new RowId(PARTITION_ID);
RowId rowId2 = new RowId(PARTITION_ID);
mvPartitionStorage.runConsistently(locker -> {
hashIndexStorage.setNextRowIdToBuild(rowId0);
sortedIndexStorage.setNextRowIdToBuild(rowId1);
pkIndexStorage.setNextRowIdToBuild(rowId2);
return null;
});
assertThat(mvPartitionStorage.flush(), willCompleteSuccessfully());
// Restart storages.
tableStorage.close();
tableStorage = createMvTableStorage();
getOrCreateMvPartition(PARTITION_ID);
IndexStorage hashIndexStorageRestarted = tableStorage.getOrCreateIndex(PARTITION_ID, hashIdx);
IndexStorage sortedIndexStorageRestarted = tableStorage.getOrCreateIndex(PARTITION_ID, sortedIdx);
IndexStorage pkIndexStorageRestarted = tableStorage.getOrCreateIndex(PARTITION_ID, pkIdx);
if (tableStorage.isVolatile()) {
assertThat(hashIndexStorageRestarted.getNextRowIdToBuild(), equalTo(INITIAL_ROW_ID_TO_BUILD));
assertThat(sortedIndexStorageRestarted.getNextRowIdToBuild(), equalTo(INITIAL_ROW_ID_TO_BUILD));
assertThat(pkIndexStorageRestarted.getNextRowIdToBuild(), nullValue());
} else {
assertThat(hashIndexStorageRestarted.getNextRowIdToBuild(), equalTo(rowId0));
assertThat(sortedIndexStorageRestarted.getNextRowIdToBuild(), equalTo(rowId1));
assertThat(pkIndexStorageRestarted.getNextRowIdToBuild(), equalTo(rowId2));
}
}
@Test
void testNextRowIdToBuildAfterRebalance() throws Exception {
testNextRowIdToBuildAfterOperation(() -> {
assertThat(tableStorage.startRebalancePartition(PARTITION_ID), willCompleteSuccessfully());
assertThat(tableStorage.finishRebalancePartition(PARTITION_ID, 100, 100, BYTE_EMPTY_ARRAY), willCompleteSuccessfully());
});
}
@Test
void testNextRowIdToBuildAfterClearPartition() throws Exception {
testNextRowIdToBuildAfterOperation(() -> assertThat(tableStorage.clearPartition(PARTITION_ID), willCompleteSuccessfully()));
}
@Test
void testNextRowIdToBuildAfterDestroyPartition() throws Exception {
testNextRowIdToBuildAfterOperation(() -> {
assertThat(tableStorage.destroyPartition(PARTITION_ID), willCompleteSuccessfully());
getOrCreateMvPartition(PARTITION_ID);
});
}
@Test
void testNextRowIdToBuildAfterDestroyTable() throws Exception {
testNextRowIdToBuildAfterOperation(() -> {
assertThat(tableStorage.destroy(), willCompleteSuccessfully());
tableStorage.close();
tableStorage = createMvTableStorage();
getOrCreateMvPartition(PARTITION_ID);
});
}
@Test
public void testIndexDestructionOnRecovery() throws Exception {
assumeFalse(tableStorage.isVolatile(), "Volatile storages do not support index recovery");
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage = tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx);
SortedIndexStorage sortedIndexStorage = tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx);
List<TestRow> rows = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, "1"), new TestValue(1, "1")))
);
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
assertThat(mvPartitionStorage.flush(), willCompleteSuccessfully());
// Restart storages.
tableStorage.close();
// Emulate a situation when indexes have been removed from the catalog. We then expect them to be removed upon startup.
when(catalogService.index(eq(hashIdx.id()), anyInt())).thenReturn(null);
when(catalogService.index(eq(sortedIdx.id()), anyInt())).thenReturn(null);
tableStorage = createMvTableStorage();
mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
hashIndexStorage = (HashIndexStorage) tableStorage.getIndex(PARTITION_ID, hashIdx.id());
sortedIndexStorage = (SortedIndexStorage) tableStorage.getIndex(PARTITION_ID, sortedIdx.id());
// Data should remain in the partition storages, but the indexes must be cleaned up.
checkForPresenceRows(mvPartitionStorage, null, null, rows);
assertThat(hashIndexStorage, is(nullValue()));
assertThat(sortedIndexStorage, is(nullValue()));
}
private static void createTestTableAndIndexes(CatalogService catalogService) {
int id = 0;
int schemaId = id++;
int tableId = id++;
int zoneId = id++;
int sortedIndexId = id++;
int hashIndexId = id++;
int pkIndexId = id++;
String pkColumnName = "INTKEY";
CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor(
tableId,
schemaId,
pkIndexId,
TABLE_NAME,
zoneId,
List.of(
CatalogUtils.fromParams(ColumnParams.builder().name(pkColumnName).type(INT32).build()),
CatalogUtils.fromParams(ColumnParams.builder().name("STRKEY").length(100).type(STRING).build()),
CatalogUtils.fromParams(ColumnParams.builder().name("INTVAL").type(INT32).build()),
CatalogUtils.fromParams(ColumnParams.builder().name("STRVAL").length(100).type(STRING).build())
),
List.of(pkColumnName),
null,
DEFAULT_STORAGE_PROFILE
);
CatalogSortedIndexDescriptor sortedIndex = new CatalogSortedIndexDescriptor(
sortedIndexId,
SORTED_INDEX_NAME,
tableId,
false,
AVAILABLE,
catalogService.latestCatalogVersion(),
List.of(new CatalogIndexColumnDescriptor("STRKEY", ASC_NULLS_LAST))
);
CatalogHashIndexDescriptor hashIndex = new CatalogHashIndexDescriptor(
hashIndexId,
HASH_INDEX_NAME,
tableId,
true,
AVAILABLE,
catalogService.latestCatalogVersion(),
List.of("STRKEY")
);
CatalogIndexDescriptor pkIndex = new CatalogHashIndexDescriptor(
pkIndexId,
PK_INDEX_NAME,
tableId,
true,
AVAILABLE,
catalogService.latestCatalogVersion(),
List.of(pkColumnName)
);
when(catalogService.table(eq(TABLE_NAME), anyLong())).thenReturn(tableDescriptor);
when(catalogService.aliveIndex(eq(SORTED_INDEX_NAME), anyLong())).thenReturn(sortedIndex);
when(catalogService.aliveIndex(eq(HASH_INDEX_NAME), anyLong())).thenReturn(hashIndex);
when(catalogService.aliveIndex(eq(PK_INDEX_NAME), anyLong())).thenReturn(pkIndex);
when(catalogService.table(eq(tableId), anyInt())).thenReturn(tableDescriptor);
when(catalogService.index(eq(sortedIndexId), anyInt())).thenReturn(sortedIndex);
when(catalogService.index(eq(hashIndexId), anyInt())).thenReturn(hashIndex);
when(catalogService.index(eq(pkIndexId), anyInt())).thenReturn(pkIndex);
}
private static <T> List<T> getAll(Cursor<T> cursor) {
try (cursor) {
return cursor.stream().collect(toList());
}
}
private static void checkCursorAfterStartRebalance(Cursor<?> cursor) {
assertDoesNotThrow(cursor::close);
assertThrows(StorageRebalanceException.class, cursor::hasNext);
assertThrows(StorageRebalanceException.class, cursor::next);
if (cursor instanceof PeekCursor) {
assertThrows(StorageRebalanceException.class, ((PeekCursor<?>) cursor)::peek);
}
}
private static void fillStorages(
MvPartitionStorage mvPartitionStorage,
@Nullable HashIndexStorage hashIndexStorage,
@Nullable SortedIndexStorage sortedIndexStorage,
List<TestRow> rows
) {
assertThat(rows, hasSize(greaterThanOrEqualTo(2)));
for (int i = 0; i < rows.size(); i++) {
int finalI = i;
TestRow row = rows.get(i);
RowId rowId = row.rowId;
BinaryRow binaryRow = row.row;
HybridTimestamp timestamp = row.timestamp;
assertNotNull(rowId);
assertNotNull(binaryRow);
assertNotNull(timestamp);
mvPartitionStorage.runConsistently(locker -> {
locker.lock(rowId);
if ((finalI % 2) == 0) {
mvPartitionStorage.addWrite(rowId, binaryRow, UUID.randomUUID(), COMMIT_TABLE_ID, rowId.partitionId());
mvPartitionStorage.commitWrite(rowId, timestamp);
} else {
mvPartitionStorage.addWriteCommitted(rowId, binaryRow, timestamp);
}
if (hashIndexStorage != null) {
IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), binaryRow, rowId);
hashIndexStorage.put(hashIndexRow);
}
if (sortedIndexStorage != null) {
IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId);
sortedIndexStorage.put(sortedIndexRow);
}
return null;
});
}
}
private static void checkForMissingRows(
@Nullable MvPartitionStorage mvPartitionStorage,
@Nullable HashIndexStorage hashIndexStorage,
@Nullable SortedIndexStorage sortedIndexStorage,
List<TestRow> rows
) {
for (TestRow row : rows) {
if (mvPartitionStorage != null) {
List<ReadResult> allVersions = mvPartitionStorage.runConsistently(locker -> {
locker.lock(row.rowId);
return getAll(mvPartitionStorage.scanVersions(row.rowId));
});
assertThat(allVersions, is(empty()));
}
if (hashIndexStorage != null) {
IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), row.row, row.rowId);
assertThat(getAll(hashIndexStorage.get(hashIndexRow.indexColumns())), is(empty()));
}
if (sortedIndexStorage != null) {
IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), row.row, row.rowId);
assertThat(getAll(sortedIndexStorage.get(sortedIndexRow.indexColumns())), is(empty()));
}
}
}
private int getPartitionIdOutOfRange() {
return tableStorage.getTableDescriptor().getPartitions();
}
private static void checkForPresenceRows(
@Nullable MvPartitionStorage mvPartitionStorage,
@Nullable HashIndexStorage hashIndexStorage,
@Nullable SortedIndexStorage sortedIndexStorage,
List<TestRow> rows
) {
for (TestRow row : rows) {
if (mvPartitionStorage != null) {
List<BinaryRow> allVersions = mvPartitionStorage.runConsistently(locker -> {
locker.lock(row.rowId);
return toListOfBinaryRows(mvPartitionStorage.scanVersions(row.rowId));
});
assertThat(allVersions, contains(equalToRow(row.row)));
}
if (hashIndexStorage != null) {
IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), row.row, row.rowId);
assertThat(getAll(hashIndexStorage.get(hashIndexRow.indexColumns())), contains(row.rowId));
}
if (sortedIndexStorage != null) {
IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), row.row, row.rowId);
assertThat(getAll(sortedIndexStorage.get(sortedIndexRow.indexColumns())), contains(row.rowId));
}
}
}
/**
* Initializes the internal structures needed for tests.
*
* <p>This method *MUST* always be called in either subclass' constructor or setUp method.
*/
protected final void initialize() {
createTestTableAndIndexes(catalogService);
this.tableStorage = createMvTableStorage();
CatalogTableDescriptor catalogTableDescriptor = catalogService.table(TABLE_NAME, clock.nowLong());
assertNotNull(catalogTableDescriptor);
CatalogIndexDescriptor catalogSortedIndexDescriptor = catalogService.aliveIndex(SORTED_INDEX_NAME, clock.nowLong());
CatalogIndexDescriptor catalogHashIndexDescriptor = catalogService.aliveIndex(HASH_INDEX_NAME, clock.nowLong());
CatalogIndexDescriptor catalogPkIndexDescriptor = catalogService.aliveIndex(PK_INDEX_NAME, clock.nowLong());
assertNotNull(catalogSortedIndexDescriptor);
assertNotNull(catalogHashIndexDescriptor);
assertNotNull(catalogPkIndexDescriptor);
sortedIdx = new StorageSortedIndexDescriptor(catalogTableDescriptor, (CatalogSortedIndexDescriptor) catalogSortedIndexDescriptor);
hashIdx = new StorageHashIndexDescriptor(catalogTableDescriptor, (CatalogHashIndexDescriptor) catalogHashIndexDescriptor);
pkIdx = StorageIndexDescriptor.create(catalogTableDescriptor, catalogPkIndexDescriptor);
}
private static void checkHashIndexStorageMethodsAfterStartRebalance(HashIndexStorage storage) {
assertDoesNotThrow(storage::indexDescriptor);
assertThrows(StorageRebalanceException.class, () -> storage.get(mock(BinaryTuple.class)));
assertThrows(StorageRebalanceException.class, () -> storage.remove(mock(IndexRow.class)));
}
@SuppressWarnings("resource")
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testDestroyPartition(boolean waitForDestroyFuture) {
assertThrows(IllegalArgumentException.class, () -> tableStorage.destroyPartition(getPartitionIdOutOfRange()));
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
RowId rowId = new RowId(PARTITION_ID);
BinaryRow binaryRow = binaryRow(new TestKey(0, "0"), new TestValue(1, "1"));
IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), binaryRow, rowId);
IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), binaryRow, rowId);
mvPartitionStorage.runConsistently(locker -> {
locker.lock(rowId);
mvPartitionStorage.addWriteCommitted(rowId, binaryRow, clock.now());
hashIndexStorage.put(hashIndexRow);
sortedIndexStorage.put(sortedIndexRow);
return null;
});
PartitionTimestampCursor scanAtTimestampCursor = mvPartitionStorage.scan(clock.now());
PartitionTimestampCursor scanLatestCursor = mvPartitionStorage.scan(HybridTimestamp.MAX_VALUE);
Cursor<ReadResult> scanVersionsCursor = mvPartitionStorage.runConsistently(locker -> {
locker.lock(rowId);
return mvPartitionStorage.scanVersions(rowId);
});
Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(hashIndexRow.indexColumns());
Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(hashIndexRow.indexColumns());
Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, GREATER);
Cursor<IndexRow> readOnlyScanFromSortedIndexCursor = sortedIndexStorage.readOnlyScan(null, null, GREATER);
Cursor<IndexRow> tolerantScanFromSortedIndexCursor = sortedIndexStorage.tolerantScan(null, null, GREATER);
CompletableFuture<Void> destroyFuture = tableStorage.destroyPartition(PARTITION_ID);
if (waitForDestroyFuture) {
assertThat(destroyFuture, willCompleteSuccessfully());
}
// Let's check that we won't get destroyed storages.
assertNull(tableStorage.getMvPartition(PARTITION_ID));
assertThrows(StorageException.class, () -> tableStorage.getOrCreateHashIndex(PARTITION_ID, hashIdx));
assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex(PARTITION_ID, sortedIdx));
checkStorageDestroyed(mvPartitionStorage);
checkStorageDestroyed(hashIndexStorage);
checkStorageDestroyed(sortedIndexStorage);
assertThrows(StorageDestroyedException.class, () -> getAll(scanAtTimestampCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(scanLatestCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(scanVersionsCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(getFromHashIndexCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(getFromSortedIndexCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(scanFromSortedIndexCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(readOnlyScanFromSortedIndexCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(tolerantScanFromSortedIndexCursor));
// What happens if there is no partition?
assertThrows(StorageException.class, () -> tableStorage.destroyPartition(PARTITION_ID));
}
@SuppressWarnings("resource")
@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testDestroyTableStorage(boolean waitForDestroyFuture) {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
HashIndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
SortedIndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
List<TestRow> rows = List.of(
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(0, "0"), new TestValue(0, "0"))),
new TestRow(new RowId(PARTITION_ID), binaryRow(new TestKey(1, "1"), new TestValue(1, "1")))
);
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
PartitionTimestampCursor scanTimestampCursor = mvPartitionStorage.scan(clock.now());
IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), rows.get(0).row, rows.get(0).rowId);
IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), rows.get(0).row, rows.get(0).rowId);
Cursor<RowId> getFromHashIndexCursor = hashIndexStorage.get(hashIndexRow.indexColumns());
Cursor<RowId> getFromSortedIndexCursor = sortedIndexStorage.get(sortedIndexRow.indexColumns());
Cursor<IndexRow> scanFromSortedIndexCursor = sortedIndexStorage.scan(null, null, GREATER);
Cursor<IndexRow> readOnlyScanFromSortedIndexCursor = sortedIndexStorage.readOnlyScan(null, null, GREATER);
Cursor<IndexRow> tolerantScanFromSortedIndexCursor = sortedIndexStorage.tolerantScan(null, null, GREATER);
CompletableFuture<Void> destroyFuture = tableStorage.destroy();
Runnable waitForDestroy = () -> assertThat(destroyFuture, willCompleteSuccessfully());
if (waitForDestroyFuture) {
waitForDestroy.run();
}
checkStorageDestroyed(mvPartitionStorage);
checkStorageDestroyed(hashIndexStorage);
checkStorageDestroyed(sortedIndexStorage);
assertThrows(StorageDestroyedException.class, () -> getAll(scanTimestampCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(getFromHashIndexCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(getFromSortedIndexCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(scanFromSortedIndexCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(readOnlyScanFromSortedIndexCursor));
assertThrows(StorageDestroyedException.class, () -> getAll(tolerantScanFromSortedIndexCursor));
// Let's check that nothing will happen if we try to destroy it again.
assertThat(tableStorage.destroy(), willCompleteSuccessfully());
// Make sure the destroy finishes before we recreate the storage.
if (!waitForDestroyFuture) {
waitForDestroy.run();
}
// Let's check that after restarting the table we will have an empty partition.
tableStorage = createMvTableStorage();
mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx);
sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx);
checkForMissingRows(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rows);
}
private void startRebalanceWithChecks(
int partitionId,
MvPartitionStorage mvPartitionStorage,
HashIndexStorage hashIndexStorage,
SortedIndexStorage sortedIndexStorage,
List<TestRow> rowsBeforeRebalanceStart
) {
fillStorages(mvPartitionStorage, hashIndexStorage, sortedIndexStorage, rowsBeforeRebalanceStart);
// Let's open the cursors before start rebalance.
TestRow rowForCursors = rowsBeforeRebalanceStart.get(0);
Cursor<?> mvPartitionStorageScanCursor = mvPartitionStorage.scan(rowForCursors.timestamp);
IndexRow hashIndexRow = indexRow(hashIndexStorage.indexDescriptor(), rowForCursors.row, rowForCursors.rowId);
IndexRow sortedIndexRow = indexRow(sortedIndexStorage.indexDescriptor(), rowForCursors.row, rowForCursors.rowId);
Cursor<?> hashIndexStorageGetCursor = hashIndexStorage.get(hashIndexRow.indexColumns());
Cursor<?> sortedIndexStorageGetCursor = sortedIndexStorage.get(sortedIndexRow.indexColumns());
Cursor<?> sortedIndexStorageScanCursor = sortedIndexStorage.scan(null, null, GREATER);
Cursor<?> sortedIndexStorageReadOnlyScanCursor = sortedIndexStorage.readOnlyScan(null, null, GREATER);
Cursor<?> sortedIndexStorageTolerantScanCursor = sortedIndexStorage.tolerantScan(null, null, GREATER);
// Partition is out of configuration range.
assertThrows(IllegalArgumentException.class, () -> tableStorage.startRebalancePartition(getPartitionIdOutOfRange()));
// Partition does not exist.
assertThrows(StorageRebalanceException.class, () -> tableStorage.startRebalancePartition(partitionId + 1));
// Let's start rebalancing of the partition.
assertThat(tableStorage.startRebalancePartition(partitionId), willCompleteSuccessfully());
// Once again, rebalancing of the partition cannot be started.
assertThrows(StorageRebalanceException.class, () -> tableStorage.startRebalancePartition(partitionId));
checkMvPartitionStorageMethodsAfterStartRebalance(mvPartitionStorage);
checkHashIndexStorageMethodsAfterStartRebalance(hashIndexStorage);
checkSortedIndexStorageMethodsAfterStartRebalance(sortedIndexStorage);
checkCursorAfterStartRebalance(mvPartitionStorageScanCursor);
checkCursorAfterStartRebalance(hashIndexStorageGetCursor);
checkCursorAfterStartRebalance(sortedIndexStorageGetCursor);
checkCursorAfterStartRebalance(sortedIndexStorageScanCursor);
checkCursorAfterStartRebalance(sortedIndexStorageReadOnlyScanCursor);
checkCursorAfterStartRebalance(sortedIndexStorageTolerantScanCursor);
}
@SuppressWarnings({"resource", "deprecation"})
private void checkMvPartitionStorageMethodsAfterStartRebalance(MvPartitionStorage storage) {
checkLastApplied(storage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
assertNull(storage.committedGroupConfiguration());
assertDoesNotThrow(() -> storage.committedGroupConfiguration());
storage.runConsistently(locker -> {
assertThrows(StorageRebalanceException.class, () -> storage.lastApplied(100, 500));
assertThrows(StorageRebalanceException.class, () -> storage.committedGroupConfiguration(BYTE_EMPTY_ARRAY));
assertThrows(
StorageRebalanceException.class,
() -> storage.committedGroupConfiguration(BYTE_EMPTY_ARRAY)
);
RowId rowId = new RowId(PARTITION_ID);
locker.lock(rowId);
assertThrows(StorageRebalanceException.class, () -> storage.read(rowId, clock.now()));
assertThrows(StorageRebalanceException.class, () -> storage.abortWrite(rowId));
assertThrows(StorageRebalanceException.class, () -> storage.scanVersions(rowId));
assertThrows(StorageRebalanceException.class, () -> storage.scan(clock.now()));
assertThrows(StorageRebalanceException.class, () -> storage.closestRowId(rowId));
assertThrows(StorageRebalanceException.class, storage::rowsCount);
return null;
});
}
private static void checkLastApplied(
MvPartitionStorage storage,
long expLastAppliedIndex,
long expLastAppliedTerm
) {
assertEquals(expLastAppliedIndex, storage.lastAppliedIndex());
assertEquals(expLastAppliedTerm, storage.lastAppliedTerm());
}
private static List<BinaryRow> toListOfBinaryRows(Cursor<ReadResult> cursor) {
try (cursor) {
return cursor.stream().map(ReadResult::binaryRow).collect(toList());
}
}
private static byte[] createRandomRaftGroupConfiguration() {
Random random = new Random(System.currentTimeMillis());
byte[] bytes = new byte[100];
random.nextBytes(bytes);
return bytes;
}
private static void checkRaftGroupConfigs(byte @Nullable [] exp, byte @Nullable [] act) {
assertNotNull(exp);
assertNotNull(act);
assertArrayEquals(exp, act);
}
/**
* Retrieves or creates a multi-versioned partition storage.
*/
protected MvPartitionStorage getOrCreateMvPartition(int partitionId) {
return getOrCreateMvPartition(tableStorage, partitionId);
}
void testNextRowIdToBuildAfterOperation(Operation operation) throws Exception {
MvPartitionStorage mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
IndexStorage hashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx, false);
IndexStorage sortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx, false);
IndexStorage pkIndexStorage = getOrCreateIndex(PARTITION_ID, pkIdx, false);
RowId rowId0 = new RowId(PARTITION_ID);
RowId rowId1 = new RowId(PARTITION_ID);
RowId rowId2 = new RowId(PARTITION_ID);
mvPartitionStorage.runConsistently(locker -> {
hashIndexStorage.setNextRowIdToBuild(rowId0);
sortedIndexStorage.setNextRowIdToBuild(rowId1);
pkIndexStorage.setNextRowIdToBuild(rowId2);
return null;
});
assertThat(mvPartitionStorage.flush(), willCompleteSuccessfully());
// We expect that nextRowIdToBuild will be reverted to its default value after the operation.
operation.doOperation();
mvPartitionStorage = getOrCreateMvPartition(PARTITION_ID);
IndexStorage recreatedHashIndexStorage = getOrCreateIndex(PARTITION_ID, hashIdx, false);
IndexStorage recreatedSortedIndexStorage = getOrCreateIndex(PARTITION_ID, sortedIdx, false);
IndexStorage recreatedPkIndexStorage = getOrCreateIndex(PARTITION_ID, pkIdx, false);
assertThat(recreatedHashIndexStorage.getNextRowIdToBuild(), is(equalTo(INITIAL_ROW_ID_TO_BUILD)));
assertThat(recreatedSortedIndexStorage.getNextRowIdToBuild(), is(equalTo(INITIAL_ROW_ID_TO_BUILD)));
assertThat(recreatedPkIndexStorage.getNextRowIdToBuild(), nullValue());
// Check that rowId can be set successfully.
mvPartitionStorage.runConsistently(locker -> {
recreatedHashIndexStorage.setNextRowIdToBuild(rowId0);
recreatedSortedIndexStorage.setNextRowIdToBuild(rowId1);
recreatedPkIndexStorage.setNextRowIdToBuild(rowId2);
return null;
});
assertThat(recreatedHashIndexStorage.getNextRowIdToBuild(), is(equalTo(rowId0)));
assertThat(recreatedSortedIndexStorage.getNextRowIdToBuild(), is(equalTo(rowId1)));
assertThat(recreatedPkIndexStorage.getNextRowIdToBuild(), is(equalTo(rowId2)));
}
private interface Operation {
void doOperation() throws Exception;
}
/**
* Returns an already created index or creates a new one.
*
* @param partitionId Partition ID.
* @param indexDescriptor Storage index descriptor.
* @param built {@code True} if index building needs to be completed.
* @see #completeBuiltIndexes(int, IndexStorage...)
*/
protected <T extends IndexStorage> T getOrCreateIndex(
int partitionId,
StorageIndexDescriptor indexDescriptor,
boolean built
) {
IndexStorage indexStorage = tableStorage.getOrCreateIndex(partitionId, indexDescriptor);
assertNotNull(indexStorage, "index=" + indexDescriptor);
if (indexStorage.getNextRowIdToBuild() != null && built) {
completeBuiltIndexes(partitionId, indexStorage);
}
return (T) indexStorage;
}
/**
* Returns an already created index or creates a new one with the completion of building.
*
* @param partitionId Partition ID.
* @param indexDescriptor Storage index descriptor.
* @see #completeBuiltIndexes(int, IndexStorage...)
*/
protected <T extends IndexStorage> T getOrCreateIndex(
int partitionId,
StorageIndexDescriptor indexDescriptor
) {
return getOrCreateIndex(partitionId, indexDescriptor, true);
}
/** Completes the building of indexes. */
protected void completeBuiltIndexes(int partitionId, IndexStorage... indexStorages) {
MvPartitionStorage partitionStorage = getOrCreateMvPartition(partitionId);
assertNotNull(partitionStorage, "partitionId=" + partitionId);
TestStorageUtils.completeBuiltIndexes(partitionStorage, indexStorages);
}
}