blob: 0d6357b4fbbdf21e80022550bace6ec788ff30a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.tx.storage.state;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.tx.storage.state.TxStateStorage.REBALANCE_IN_PROGRESS;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_REBALANCE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_STATE_STORAGE_STOPPED_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.IntStream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.util.Cursor;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
/**
* Abstract tx storage test.
*/
public abstract class AbstractTxStateStorageTest extends BaseIgniteAbstractTest {
protected static final int TABLE_ID = 1;
protected TxStateTableStorage tableStorage;
/**
* Creates {@link TxStateStorage} to test.
*/
protected abstract TxStateTableStorage createTableStorage();
@BeforeEach
protected void beforeTest() {
tableStorage = createTableStorage();
tableStorage.start();
}
@AfterEach
protected void afterTest() throws Exception {
tableStorage.close();
}
@Test
public void testPutGetRemove() {
TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
List<UUID> txIds = new ArrayList<>();
for (int i = 0; i < 100; i++) {
UUID txId = UUID.randomUUID();
txIds.add(txId);
storage.put(txId, new TxMeta(TxState.COMMITTED, generateEnlistedPartitions(i), generateTimestamp(txId)));
}
for (int i = 0; i < 100; i++) {
TxMeta txMeta = storage.get(txIds.get(i));
TxMeta txMetaExpected = new TxMeta(TxState.COMMITTED, generateEnlistedPartitions(i), generateTimestamp(txIds.get(i)));
assertEquals(txMetaExpected, txMeta);
}
for (int i = 0; i < 100; i++) {
if (i % 2 == 0) {
storage.remove(txIds.get(i), i, 1);
}
}
for (int i = 0; i < 100; i++) {
if (i % 2 == 0) {
TxMeta txMeta = storage.get(txIds.get(i));
assertNull(txMeta);
} else {
TxMeta txMeta = storage.get(txIds.get(i));
TxMeta txMetaExpected = new TxMeta(TxState.COMMITTED, generateEnlistedPartitions(i), generateTimestamp(txIds.get(i)));
assertEquals(txMetaExpected, txMeta);
}
}
}
private List<TablePartitionId> generateEnlistedPartitions(int c) {
return IntStream.range(0, c)
.mapToObj(partitionNumber -> new TablePartitionId(TABLE_ID, partitionNumber))
.collect(toList());
}
private HybridTimestamp generateTimestamp(UUID uuid) {
long time = Math.abs(uuid.getMostSignificantBits());
if (time <= 0) {
time = 1;
}
return hybridTimestamp(time);
}
@Test
public void testCas() {
TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
UUID txId = UUID.randomUUID();
TxMeta txMeta1 = new TxMeta(TxState.COMMITTED, new ArrayList<>(), generateTimestamp(txId));
TxMeta txMeta2 = new TxMeta(TxState.COMMITTED, new ArrayList<>(), generateTimestamp(UUID.randomUUID()));
assertTrue(storage.compareAndSet(txId, null, txMeta1, 1, 1));
// Checking idempotency.
assertTrue(storage.compareAndSet(txId, null, txMeta1, 1, 1));
assertTrue(storage.compareAndSet(txId, TxState.ABORTED, txMeta1, 1, 1));
TxMeta txMetaWrongTimestamp0 =
new TxMeta(txMeta1.txState(), txMeta1.enlistedPartitions(), generateTimestamp(UUID.randomUUID()));
assertFalse(storage.compareAndSet(txId, TxState.ABORTED, txMetaWrongTimestamp0, 1, 1));
TxMeta txMetaNullTimestamp0 = new TxMeta(txMeta1.txState(), txMeta1.enlistedPartitions(), null);
assertFalse(storage.compareAndSet(txId, TxState.ABORTED, txMetaNullTimestamp0, 3, 2));
assertEquals(storage.get(txId), txMeta1);
assertTrue(storage.compareAndSet(txId, txMeta1.txState(), txMeta2, 3, 2));
// Checking idempotency.
assertTrue(storage.compareAndSet(txId, txMeta1.txState(), txMeta2, 3, 2));
assertTrue(storage.compareAndSet(txId, TxState.ABORTED, txMeta2, 3, 2));
TxMeta txMetaNullTimestamp2 = new TxMeta(txMeta2.txState(), txMeta2.enlistedPartitions(), null);
assertFalse(storage.compareAndSet(txId, TxState.ABORTED, txMetaNullTimestamp2, 3, 2));
assertEquals(storage.get(txId), txMeta2);
}
@Test
public void testScan() {
TxStateStorage storage0 = tableStorage.getOrCreateTxStateStorage(0);
TxStateStorage storage1 = tableStorage.getOrCreateTxStateStorage(1);
TxStateStorage storage2 = tableStorage.getOrCreateTxStateStorage(2);
Map<UUID, TxMeta> txs = new HashMap<>();
putRandomTxMetaWithCommandIndex(storage0, 1, 0);
putRandomTxMetaWithCommandIndex(storage2, 1, 0);
for (int i = 0; i < 100; i++) {
IgniteBiTuple<UUID, TxMeta> txData = putRandomTxMetaWithCommandIndex(storage1, i, i);
txs.put(txData.get1(), txData.get2());
}
try (Cursor<IgniteBiTuple<UUID, TxMeta>> scanCursor = storage1.scan()) {
assertTrue(scanCursor.hasNext());
while (scanCursor.hasNext()) {
IgniteBiTuple<UUID, TxMeta> txData = scanCursor.next();
TxMeta txMeta = txs.remove(txData.getKey());
assertNotNull(txMeta);
assertNotNull(txData);
assertNotNull(txData.getValue());
assertEquals(txMeta, txData.getValue());
}
assertTrue(txs.isEmpty());
}
}
@Test
public void testDestroy() {
TxStateStorage storage0 = tableStorage.getOrCreateTxStateStorage(0);
TxStateStorage storage1 = tableStorage.getOrCreateTxStateStorage(1);
UUID txId0 = UUID.randomUUID();
storage0.put(txId0, new TxMeta(TxState.COMMITTED, generateEnlistedPartitions(1), generateTimestamp(txId0)));
UUID txId1 = UUID.randomUUID();
storage1.put(txId1, new TxMeta(TxState.COMMITTED, generateEnlistedPartitions(1), generateTimestamp(txId1)));
storage0.destroy();
assertNotNull(storage1.get(txId1));
}
@Test
public void scansInOrderDefinedByTxIds() {
TxStateStorage partitionStorage = tableStorage.getOrCreateTxStateStorage(0);
for (int i = 0; i < 100; i++) {
putRandomTxMetaWithCommandIndex(partitionStorage, i, i);
}
try (Cursor<IgniteBiTuple<UUID, TxMeta>> scanCursor = partitionStorage.scan()) {
List<UUID> txIds = new ArrayList<>();
while (scanCursor.hasNext()) {
IgniteBiTuple<UUID, TxMeta> txData = scanCursor.next();
txIds.add(txData.getKey());
}
assertThat(txIds, equalTo(txIds.stream().sorted(new UnsignedUuidComparator()).collect(toList())));
}
}
@Test
public void scanOnlySeesDataExistingAtTheMomentOfCreation() {
TxStateStorage partitionStorage = tableStorage.getOrCreateTxStateStorage(0);
UUID existingBeforeScan = new UUID(2, 0);
partitionStorage.put(existingBeforeScan, randomTxMeta(1, existingBeforeScan));
try (Cursor<IgniteBiTuple<UUID, TxMeta>> cursor = partitionStorage.scan()) {
UUID prependedDuringScan = new UUID(1, 0);
partitionStorage.put(prependedDuringScan, randomTxMeta(1, prependedDuringScan));
UUID appendedDuringScan = new UUID(3, 0);
partitionStorage.put(appendedDuringScan, randomTxMeta(1, appendedDuringScan));
List<UUID> txIdsReturnedByScan = cursor.stream()
.map(IgniteBiTuple::getKey)
.collect(toList());
assertThat(txIdsReturnedByScan, is(List.of(existingBeforeScan)));
}
}
@Test
void lastAppliedIndexGetterIsConsistentWithSetter() {
TxStateStorage partitionStorage = tableStorage.getOrCreateTxStateStorage(0);
partitionStorage.lastApplied(10, 2);
assertThat(partitionStorage.lastAppliedIndex(), is(10L));
}
@Test
void lastAppliedTermGetterIsConsistentWithSetter() {
TxStateStorage partitionStorage = tableStorage.getOrCreateTxStateStorage(0);
partitionStorage.lastApplied(10, 2);
assertThat(partitionStorage.lastAppliedTerm(), is(2L));
}
@Test
void compareAndSetMakesLastAppliedChangeVisible() {
TxStateStorage partitionStorage = tableStorage.getOrCreateTxStateStorage(0);
UUID txId = UUID.randomUUID();
partitionStorage.compareAndSet(txId, null, randomTxMeta(1, txId), 10, 2);
assertThat(partitionStorage.lastAppliedIndex(), is(10L));
assertThat(partitionStorage.lastAppliedTerm(), is(2L));
}
@Test
public void testSuccessRebalance() {
TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
// We can't finish rebalance that we haven't started.
assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.finishRebalance(100, 500));
List<IgniteBiTuple<UUID, TxMeta>> rowsBeforeStartRebalance = List.of(
randomTxMetaTuple(1, UUID.randomUUID()),
randomTxMetaTuple(1, UUID.randomUUID())
);
startRebalanceWithChecks(storage, rowsBeforeStartRebalance);
List<IgniteBiTuple<UUID, TxMeta>> rowsOnRebalance = List.of(
randomTxMetaTuple(1, UUID.randomUUID()),
randomTxMetaTuple(1, UUID.randomUUID())
);
// Let's fill it with new data.
fillStorage(storage, rowsOnRebalance);
checkLastApplied(storage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
// Let's complete rebalancing.
assertThat(storage.finishRebalance(30, 50), willCompleteSuccessfully());
// Let's check the storage.
checkLastApplied(storage, 30, 30, 50);
checkStorageContainsRows(storage, rowsOnRebalance);
}
@Test
public void testFailRebalance() {
TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
// Nothing will happen because rebalance has not started.
assertThat(storage.abortRebalance(), willCompleteSuccessfully());
List<IgniteBiTuple<UUID, TxMeta>> rowsBeforeStartRebalance = List.of(
randomTxMetaTuple(1, UUID.randomUUID()),
randomTxMetaTuple(1, UUID.randomUUID())
);
startRebalanceWithChecks(storage, rowsBeforeStartRebalance);
List<IgniteBiTuple<UUID, TxMeta>> rowsOnRebalance = List.of(
randomTxMetaTuple(1, UUID.randomUUID()),
randomTxMetaTuple(1, UUID.randomUUID())
);
// Let's fill it with new data.
fillStorage(storage, rowsOnRebalance);
checkLastApplied(storage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
// Let's abort rebalancing.
assertThat(storage.abortRebalance(), willCompleteSuccessfully());
// Let's check the storage.
checkLastApplied(storage, 0, 0, 0);
checkStorageIsEmpty(storage);
}
@Test
public void testStartRebalanceForClosedOrDestroyedPartition() {
TxStateStorage storage0 = tableStorage.getOrCreateTxStateStorage(0);
TxStateStorage storage1 = tableStorage.getOrCreateTxStateStorage(1);
storage0.close();
storage1.destroy();
assertThrowsIgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, storage0::startRebalance);
assertThrowsIgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, storage1::startRebalance);
}
@Test
void testClear() {
TxStateStorage storage = tableStorage.getOrCreateTxStateStorage(0);
// Cleaning up on empty storage should not generate errors.
assertThat(storage.clear(), willCompleteSuccessfully());
checkLastApplied(storage, 0, 0, 0);
checkStorageIsEmpty(storage);
List<IgniteBiTuple<UUID, TxMeta>> rows = List.of(
randomTxMetaTuple(1, UUID.randomUUID()),
randomTxMetaTuple(1, UUID.randomUUID())
);
fillStorage(storage, rows);
// Cleanup the non-empty storage.
assertThat(storage.clear(), willCompleteSuccessfully());
checkLastApplied(storage, 0, 0, 0);
checkStorageIsEmpty(storage);
}
@Test
void testCleanOnClosedDestroyedAndRebalancedStorages() {
TxStateStorage storage0 = tableStorage.getOrCreateTxStateStorage(0);
TxStateStorage storage1 = tableStorage.getOrCreateTxStateStorage(1);
TxStateStorage storage2 = tableStorage.getOrCreateTxStateStorage(2);
storage0.close();
storage1.destroy();
assertThat(storage2.startRebalance(), willCompleteSuccessfully());
try {
assertThrowsIgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, storage0::clear);
assertThrowsIgniteInternalException(TX_STATE_STORAGE_STOPPED_ERR, storage1::clear);
assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, storage2::clear);
} finally {
assertThat(storage2.abortRebalance(), willCompleteSuccessfully());
}
}
private static void checkStorageIsEmpty(TxStateStorage storage) {
try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
assertThat(scan.stream().collect(toList()), is(empty()));
}
}
protected static void checkStorageContainsRows(TxStateStorage storage, List<IgniteBiTuple<UUID, TxMeta>> expRows) {
try (Cursor<IgniteBiTuple<UUID, TxMeta>> scan = storage.scan()) {
assertThat(
scan.stream().collect(toList()),
containsInAnyOrder(expRows.toArray(new IgniteBiTuple[0]))
);
}
}
private void startRebalanceWithChecks(TxStateStorage storage, List<IgniteBiTuple<UUID, TxMeta>> rows) {
fillStorage(storage, rows);
Cursor<IgniteBiTuple<UUID, TxMeta>> scanCursorBeforeStartRebalance = storage.scan();
assertThat(storage.startRebalance(), willCompleteSuccessfully());
checkTxStateStorageMethodsWhenRebalanceInProgress(storage);
assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, scanCursorBeforeStartRebalance::hasNext);
assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, scanCursorBeforeStartRebalance::next);
// We cannot start a new rebalance until the current one has ended.
assertThrows(IgniteInternalException.class, storage::startRebalance);
}
private static void checkTxStateStorageMethodsWhenRebalanceInProgress(TxStateStorage storage) {
checkLastApplied(storage, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS, REBALANCE_IN_PROGRESS);
assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.lastApplied(100, 500));
assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.get(UUID.randomUUID()));
assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, () -> storage.remove(UUID.randomUUID(), 1, 1));
assertThrowsIgniteInternalException(TX_STATE_STORAGE_REBALANCE_ERR, storage::scan);
}
private IgniteBiTuple<UUID, TxMeta> putRandomTxMetaWithCommandIndex(TxStateStorage storage, int enlistedPartsCount, long commandIndex) {
UUID txId = UUID.randomUUID();
TxMeta txMeta = randomTxMeta(enlistedPartsCount, txId);
storage.compareAndSet(txId, null, txMeta, commandIndex, 1);
return new IgniteBiTuple<>(txId, txMeta);
}
private TxMeta randomTxMeta(int enlistedPartsCount, UUID txId) {
return new TxMeta(TxState.COMMITTED, generateEnlistedPartitions(enlistedPartsCount), generateTimestamp(txId));
}
/**
* Creates random tx meta.
*
* @param enlistedPartsCount Count of enlisted partitions.
* @param txId Tx ID.
*/
protected IgniteBiTuple<UUID, TxMeta> randomTxMetaTuple(int enlistedPartsCount, UUID txId) {
return new IgniteBiTuple<>(
txId,
new TxMeta(TxState.COMMITTED, generateEnlistedPartitions(enlistedPartsCount), generateTimestamp(txId))
);
}
private static void assertThrowsIgniteInternalException(int expFullErrorCode, Executable executable) {
IgniteInternalException exception = assertThrows(IgniteInternalException.class, executable);
assertEquals(expFullErrorCode, exception.code());
}
/**
* Fills storage.
*
* @param storage Storage.
* @param rows Rows.
*/
protected static void fillStorage(TxStateStorage storage, List<IgniteBiTuple<UUID, TxMeta>> rows) {
assertThat(rows, hasSize(greaterThanOrEqualTo(2)));
for (int i = 0; i < rows.size(); i++) {
IgniteBiTuple<UUID, TxMeta> row = rows.get(i);
if ((i % 2) == 0) {
assertTrue(storage.compareAndSet(row.get1(), null, row.get2(), i * 10L, i * 10L));
} else {
storage.put(row.get1(), row.get2());
}
}
}
/**
* Checks last applied index and term.
*
* @param storage Storage.
* @param expLastAppliedIndex Expected last applied index.
* @param expLastAppliedTerm Expected last applied term.
* @param expPersistentIndex Expected persistent last applied index.
*/
protected static void checkLastApplied(
TxStateStorage storage,
long expLastAppliedIndex,
long expPersistentIndex,
long expLastAppliedTerm
) {
assertEquals(expLastAppliedIndex, storage.lastAppliedIndex());
assertEquals(expLastAppliedTerm, storage.lastAppliedTerm());
}
}