blob: dc554fac03f5fef3f6c562bef58d7ad40ccdb787 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.storage;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.util.Cursor;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
/**
* Base test for MV partition storages.
*/
public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStorageTest {
protected final UUID txId = newTransactionId();
protected final TestKey key = new TestKey(10, "foo");
private final TestValue value = new TestValue(20, "bar");
protected final BinaryRow binaryRow = binaryRow(key, value);
private final TestValue value2 = new TestValue(21, "bar2");
private final BinaryRow binaryRow2 = binaryRow(key, value2);
private final BinaryRow binaryRow3 = binaryRow(key, new TestValue(22, "bar3"));
/**
* Tests that reads from empty storage return empty results.
*/
@Test
public void testReadsFromEmpty() {
RowId rowId = new RowId(PARTITION_ID);
assertEquals(PARTITION_ID, rowId.partitionId());
assertNull(read(rowId, clock.now()));
}
@ParameterizedTest
@EnumSource
public void testScanOverEmpty(ScanTimestampProvider tsProvider) {
assertEquals(List.of(), convert(scan(tsProvider.scanTimestamp(clock))));
}
/**
* Tests basic invariants of {@link MvPartitionStorage#addWrite(RowId, BinaryRow, UUID, int, int)}.
*/
@Test
public void testAddWrite() {
RowId rowId = insert(binaryRow, txId);
// Attempt to write from another transaction.
assertThrows(TxIdMismatchException.class, () -> addWrite(rowId, binaryRow, newTransactionId()));
// Write from the same transaction.
addWrite(rowId, binaryRow, txId);
// Read with timestamp returns write-intent.
assertThat(read(rowId, clock.now()), is(equalToRow(binaryRow)));
// Remove write.
addWrite(rowId, null, txId);
// Removed row can't be read.
assertNull(read(rowId, HybridTimestamp.MAX_VALUE));
// Remove write once again.
addWrite(rowId, null, txId);
// Still can't be read.
assertNull(read(rowId, HybridTimestamp.MAX_VALUE));
}
/**
* Tests basic invariants of {@link MvPartitionStorage#abortWrite(RowId)}.
*/
@Test
public void testAbortWrite() {
RowId rowId = insert(binaryRow(key, value), txId);
abortWrite(rowId);
// Aborted row can't be read.
assertNull(read(rowId, HybridTimestamp.MAX_VALUE));
}
/**
* Tests basic invariants of {@link MvPartitionStorage#commitWrite(RowId, HybridTimestamp)}.
*/
@Test
public void testCommitWrite() {
RowId rowId = insert(binaryRow, txId);
HybridTimestamp tsBefore = clock.now();
HybridTimestamp tsExact = clock.now();
commitWrite(rowId, tsExact);
HybridTimestamp tsAfter = clock.now();
// Row is invisible at the time before writing.
assertNull(read(rowId, tsBefore));
// Row is valid at the time during and after writing.
assertThat(read(rowId, tsExact), is(equalToRow(binaryRow)));
assertThat(read(rowId, tsAfter), is(equalToRow(binaryRow)));
TestValue newValue = new TestValue(30, "duh");
UUID newTxId = newTransactionId();
BinaryRow newRow = binaryRow(key, newValue);
addWrite(rowId, newRow, newTxId);
// Same checks, but now there are two different versions.
assertNull(read(rowId, tsBefore));
assertThat(read(rowId, HybridTimestamp.MAX_VALUE), is(equalToRow(newRow)));
assertThat(read(rowId, tsExact), is(equalToRow(binaryRow)));
assertThat(read(rowId, tsAfter), is(equalToRow(newRow)));
assertThat(read(rowId, clock.now()), is(equalToRow(newRow)));
// Only latest time behavior changes after commit.
HybridTimestamp newRowCommitTs = clock.now();
commitWrite(rowId, newRowCommitTs);
assertThat(read(rowId, HybridTimestamp.MAX_VALUE), is(equalToRow(newRow)));
assertThat(read(rowId, tsExact), is(equalToRow(binaryRow)));
assertThat(read(rowId, tsAfter), is(equalToRow(binaryRow)));
assertThat(read(rowId, clock.now()), is(equalToRow(newRow)));
// Remove.
UUID removeTxId = newTransactionId();
addWrite(rowId, null, removeTxId);
assertNull(read(rowId, tsBefore));
assertNull(read(rowId, HybridTimestamp.MAX_VALUE));
assertThat(read(rowId, tsExact), is(equalToRow(binaryRow)));
assertThat(read(rowId, tsAfter), is(equalToRow(binaryRow)));
assertThat(read(rowId, newRowCommitTs), is(equalToRow(newRow)));
assertNull(read(rowId, clock.now()));
// Commit remove.
HybridTimestamp removeTs = clock.now();
commitWrite(rowId, removeTs);
assertNull(read(rowId, tsBefore));
assertNull(read(rowId, HybridTimestamp.MAX_VALUE));
assertNull(read(rowId, removeTs));
assertNull(read(rowId, clock.now()));
assertThat(read(rowId, tsExact), is(equalToRow(binaryRow)));
assertThat(read(rowId, tsAfter), is(equalToRow(binaryRow)));
}
/**
* Tests basic invariants of {@link MvPartitionStorage#scan(HybridTimestamp)}.
*/
@Test
public void testScan() {
TestKey key1 = new TestKey(1, "1");
TestValue value1 = new TestValue(10, "xxx");
TestKey key2 = new TestKey(2, "2");
TestValue value2 = new TestValue(20, "yyy");
UUID txId = newTransactionId();
RowId rowId1 = insert(binaryRow(key1, value1), txId);
RowId rowId2 = insert(binaryRow(key2, value2), txId);
HybridTimestamp ts1 = clock.now();
HybridTimestamp ts2 = clock.now();
commitWrite(rowId1, ts2);
HybridTimestamp ts3 = clock.now();
HybridTimestamp ts4 = clock.now();
commitWrite(rowId2, ts4);
HybridTimestamp ts5 = clock.now();
// Full scan with various timestamp values.
assertEquals(List.of(), convert(scan(ts1)));
assertEquals(List.of(value1), convert(scan(ts2)));
assertEquals(List.of(value1), convert(scan(ts3)));
assertEquals(List.of(value1, value2), convert(scan(ts4)));
assertEquals(List.of(value1, value2), convert(scan(ts5)));
assertEquals(List.of(value1, value2), convert(scan(HybridTimestamp.MAX_VALUE)));
}
@SuppressWarnings("ConstantConditions")
@Test
public void testTransactionScanCursorInvariants() {
TestValue value1 = new TestValue(10, "xxx");
TestValue value2 = new TestValue(20, "yyy");
RowId rowId1 = insert(binaryRow(new TestKey(1, "1"), value1), txId);
commitWrite(rowId1, clock.now());
RowId rowId2 = insert(binaryRow(new TestKey(2, "2"), value2), txId);
commitWrite(rowId2, clock.now());
try (PartitionTimestampCursor cursor = scan(HybridTimestamp.MAX_VALUE)) {
assertTrue(cursor.hasNext());
assertTrue(cursor.hasNext());
List<TestValue> res = new ArrayList<>();
res.add(value(cursor.next().binaryRow()));
assertTrue(cursor.hasNext());
assertTrue(cursor.hasNext());
res.add(value(cursor.next().binaryRow()));
assertFalse(cursor.hasNext());
assertFalse(cursor.hasNext());
assertThrows(NoSuchElementException.class, cursor::next);
assertThat(res, containsInAnyOrder(value1, value2));
}
}
@SuppressWarnings("ConstantConditions")
@Test
public void testTimestampScanCursorInvariants() {
TestValue value11 = new TestValue(10, "xxx");
TestValue value12 = new TestValue(11, "xxx");
TestValue value21 = new TestValue(20, "yyy");
TestValue value22 = new TestValue(21, "yyy");
RowId rowId1 = new RowId(PARTITION_ID, 10, 10);
RowId rowId2 = new RowId(PARTITION_ID, 10, 20);
TestKey key1 = new TestKey(1, "1");
BinaryRow binaryRow11 = binaryRow(key1, value11);
BinaryRow binaryRow12 = binaryRow(key1, value12);
addWrite(rowId1, binaryRow11, txId);
HybridTimestamp commitTs1 = clock.now();
commitWrite(rowId1, commitTs1);
addWrite(rowId1, binaryRow12, newTransactionId());
TestKey key2 = new TestKey(2, "2");
BinaryRow binaryRow21 = binaryRow(key2, value21);
BinaryRow binaryRow22 = binaryRow(key2, value22);
addWrite(rowId2, binaryRow21, txId);
HybridTimestamp commitTs2 = clock.now();
commitWrite(rowId2, commitTs2);
addWrite(rowId2, binaryRow22, newTransactionId());
try (PartitionTimestampCursor cursor = scan(clock.now())) {
assertThrows(IllegalStateException.class, () -> cursor.committed(commitTs1));
assertTrue(cursor.hasNext());
while (cursor.hasNext()) {
assertTrue(cursor.hasNext());
ReadResult res = cursor.next();
assertNotNull(res);
assertTrue(res.isWriteIntent());
assertFalse(res.isEmpty());
BinaryRow expectedRow1;
BinaryRow expectedRow2;
HybridTimestamp commitTs;
TestKey readKey = key(res.binaryRow());
if (readKey.equals(key1)) {
expectedRow1 = binaryRow11;
expectedRow2 = binaryRow12;
commitTs = commitTs1;
} else {
expectedRow1 = binaryRow21;
expectedRow2 = binaryRow22;
commitTs = commitTs2;
}
assertThat(res.binaryRow(), is(equalToRow(expectedRow2)));
BinaryRow previousRow = cursor.committed(commitTs);
assertNotNull(previousRow);
assertThat(previousRow, is(equalToRow(expectedRow1)));
}
assertFalse(cursor.hasNext());
assertFalse(cursor.hasNext());
assertThrows(NoSuchElementException.class, cursor::next);
assertThrows(IllegalStateException.class, () -> cursor.committed(commitTs1));
}
}
private List<TestValue> convert(PartitionTimestampCursor cursor) {
try (cursor) {
return cursor.stream()
.map((ReadResult rs) -> BaseMvStoragesTest.value(rs.binaryRow()))
.sorted(Comparator.nullsFirst(Comparator.naturalOrder()))
.collect(toList());
}
}
@Test
void readOfUncommittedRowReturnsTheRow() {
RowId rowId = insert(binaryRow, txId);
ReadResult foundResult = storage.read(rowId, HybridTimestamp.MAX_VALUE);
assertThat(foundResult.rowId(), is(rowId));
assertThat(foundResult.binaryRow(), is(equalToRow(binaryRow)));
}
@Test
void readOfCommittedRowReturnsTheRow() {
RowId rowId = insert(binaryRow, txId);
commitWrite(rowId, clock.now());
ReadResult foundResult = storage.read(rowId, HybridTimestamp.MAX_VALUE);
assertThat(foundResult.rowId(), is(rowId));
assertThat(foundResult.binaryRow(), is(equalToRow(binaryRow)));
}
@Test
void readsUncommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
RowId rowId1 = insert(binaryRow, txId);
commitWrite(rowId1, clock.now());
RowId rowId2 = insert(binaryRow2, txId);
ReadResult foundResult = storage.read(rowId2, HybridTimestamp.MAX_VALUE);
assertThat(foundResult.rowId(), is(rowId2));
assertThat(foundResult.binaryRow(), is(equalToRow(binaryRow2)));
}
@Test
void readsCommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
RowId rowId1 = insert(binaryRow, txId);
commitWrite(rowId1, clock.now());
RowId rowId2 = insert(binaryRow2, txId);
commitWrite(rowId2, clock.now());
ReadResult foundResult = storage.read(rowId2, HybridTimestamp.MAX_VALUE);
assertThat(foundResult.rowId(), is(rowId2));
assertThat(foundResult.binaryRow(), is(equalToRow(binaryRow2)));
}
@Test
void readByExactlyCommitTimestampFindsRow() {
RowId rowId = insert(binaryRow, txId);
HybridTimestamp commitTimestamp = clock.now();
commitWrite(rowId, commitTimestamp);
BinaryRow foundRow = read(rowId, commitTimestamp);
assertThat(foundRow, is(equalToRow(binaryRow)));
}
@Test
void readByTimestampAfterCommitTimestampFindsRow() {
RowId rowId = insert(binaryRow, txId);
HybridTimestamp commitTimestamp = clock.now();
commitWrite(rowId, commitTimestamp);
HybridTimestamp afterCommit = clock.now();
BinaryRow foundRow = read(rowId, afterCommit);
assertThat(foundRow, is(equalToRow(binaryRow)));
}
@Test
void readByTimestampBeforeFirstVersionCommitTimestampFindsNothing() {
HybridTimestamp beforeCommit = clock.now();
RowId rowId = insert(binaryRow, txId);
HybridTimestamp commitTimestamp = clock.now();
commitWrite(rowId, commitTimestamp);
BinaryRow foundRow = read(rowId, beforeCommit);
assertThat(foundRow, is(nullValue()));
}
@Test
void readByTimestampOfLastVersionFindsLastVersion() {
RowId rowId = insert(binaryRow, txId);
HybridTimestamp firstVersionTs = clock.now();
commitWrite(rowId, firstVersionTs);
addWrite(rowId, binaryRow2, newTransactionId());
HybridTimestamp secondVersionTs = clock.now();
commitWrite(rowId, secondVersionTs);
BinaryRow foundRow = read(rowId, secondVersionTs);
assertThat(foundRow, is(equalToRow(binaryRow2)));
}
@Test
void readByTimestampOfPreviousVersionFindsPreviousVersion() {
RowId rowId = insert(binaryRow, txId);
HybridTimestamp firstVersionTs = clock.now();
commitWrite(rowId, firstVersionTs);
addWrite(rowId, binaryRow2, newTransactionId());
commitWrite(rowId, clock.now());
BinaryRow foundRow = read(rowId, firstVersionTs);
assertThat(foundRow, is(equalToRow(binaryRow)));
}
@Test
void readByTimestampBetweenVersionsFindsPreviousVersion() {
RowId rowId = insert(binaryRow, txId);
HybridTimestamp firstVersionTs = clock.now();
commitWrite(rowId, firstVersionTs);
HybridTimestamp tsInBetween = clock.now();
addWrite(rowId, binaryRow2, newTransactionId());
commitWrite(rowId, clock.now());
BinaryRow foundRow = read(rowId, tsInBetween);
assertThat(foundRow, is(equalToRow(binaryRow)));
}
@Test
void readByTimestampAfterWriteFindsUncommittedVersion() {
RowId rowId = new RowId(PARTITION_ID);
addWrite(rowId, binaryRow, newTransactionId());
HybridTimestamp latestTs = clock.now();
BinaryRow foundRow = read(rowId, latestTs);
assertThat(foundRow, is(equalToRow(binaryRow)));
}
@Test
void readByTimestampAfterCommitAndWriteFindsUncommittedVersion() {
RowId rowId = insert(binaryRow, newTransactionId());
commitWrite(rowId, clock.now());
addWrite(rowId, binaryRow2, newTransactionId());
HybridTimestamp latestTs = clock.now();
BinaryRow foundRow = read(rowId, latestTs);
assertThat(foundRow, is(equalToRow(binaryRow2)));
}
@Test
void addWriteWithDifferentTxIdThrows() {
RowId rowId = insert(binaryRow, txId);
assertThrows(TxIdMismatchException.class, () -> addWrite(rowId, binaryRow2, newTransactionId()));
}
@Test
void secondUncommittedWriteWithSameTxIdReplacesExistingUncommittedWrite() {
RowId rowId = insert(binaryRow, txId);
addWrite(rowId, binaryRow2, txId);
BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);
assertThat(foundRow, is(equalToRow(binaryRow2)));
}
@Test
void addWriteReturnsUncommittedVersionIfItExists() {
RowId rowId = insert(binaryRow, txId);
BinaryRow returnedRow = addWrite(rowId, binaryRow2, txId);
assertThat(returnedRow, is(equalToRow(binaryRow)));
}
@Test
void addWriteReturnsNullIfNoUncommittedVersionExists() {
RowId rowId = insert(binaryRow, newTransactionId());
commitWrite(rowId, clock.now());
BinaryRow returnedRow = addWrite(rowId, binaryRow2, txId);
assertThat(returnedRow, is(nullValue()));
}
@Test
void addWriteCommittedTombstone() {
addWriteCommitted(ROW_ID, binaryRow, clock.now());
assertThat(read(ROW_ID, HybridTimestamp.MAX_VALUE), is(equalToRow(binaryRow)));
addWriteCommitted(ROW_ID, null, clock.now());
assertNull(read(ROW_ID, HybridTimestamp.MAX_VALUE));
}
@Test
void addWriteCreatesUncommittedVersion() {
RowId rowId = insert(binaryRow, txId);
ReadResult readResult = storage.read(rowId, clock.now());
assertTrue(readResult.isWriteIntent());
}
@Test
void afterRemovalReadWithTxIdFindsNothing() {
RowId rowId = insert(binaryRow, newTransactionId());
commitWrite(rowId, clock.now());
addWrite(rowId, null, txId);
BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);
assertThat(foundRow, is(nullValue()));
}
@Test
void afterRemovalReadByLatestTimestampFindsNothing() {
RowId rowId = insert(binaryRow, newTransactionId());
commitWrite(rowId, clock.now());
addWrite(rowId, null, newTransactionId());
commitWrite(rowId, clock.now());
BinaryRow foundRow = read(rowId, clock.now());
assertThat(foundRow, is(nullValue()));
}
@Test
void afterRemovalPreviousVersionRemainsAccessibleByTimestamp() {
RowId rowId = insert(binaryRow, newTransactionId());
HybridTimestamp firstTimestamp = clock.now();
commitWrite(rowId, firstTimestamp);
addWrite(rowId, null, newTransactionId());
commitWrite(rowId, clock.now());
BinaryRow foundRow = read(rowId, firstTimestamp);
assertThat(foundRow, is(equalToRow(binaryRow)));
}
@Test
void removalReturnsUncommittedRowVersionIfItExists() {
RowId rowId = insert(binaryRow, txId);
BinaryRow rowFromRemoval = addWrite(rowId, null, txId);
assertThat(rowFromRemoval, is(equalToRow(binaryRow)));
}
@Test
void removalReturnsNullIfNoUncommittedVersionExists() {
RowId rowId = insert(binaryRow, newTransactionId());
commitWrite(rowId, clock.now());
BinaryRow rowFromRemoval = addWrite(rowId, null, newTransactionId());
assertThat(rowFromRemoval, is(nullValue()));
}
@Test
void commitWriteCommitsWriteIntentVersion() {
RowId rowId = insert(binaryRow, txId);
commitWrite(rowId, clock.now());
ReadResult readResult = storage.read(rowId, clock.now());
assertFalse(readResult.isWriteIntent());
}
@Test
void commitWriteMakesVersionAvailableToReadByTimestamp() {
RowId rowId = insert(binaryRow, txId);
commitWrite(rowId, clock.now());
BinaryRow foundRow = read(rowId, clock.now());
assertThat(foundRow, is(equalToRow(binaryRow)));
}
@Test
void commitAndAbortWriteNoOpIfNoUncommittedVersionExists() {
RowId rowId = insert(binaryRow, newTransactionId());
commitWrite(rowId, clock.now());
abortWrite(rowId);
assertThat(read(rowId, HybridTimestamp.MAX_VALUE), is(equalToRow(binaryRow)));
commitWrite(rowId, clock.now());
assertThat(read(rowId, HybridTimestamp.MAX_VALUE), is(equalToRow(binaryRow)));
}
@Test
void abortWriteRemovesUncommittedVersion() {
RowId rowId = insert(binaryRow, newTransactionId());
commitWrite(rowId, clock.now());
addWrite(rowId, binaryRow2, txId);
abortWrite(rowId);
BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);
assertThat(foundRow, is(equalToRow(binaryRow)));
}
@Test
void abortOfInsertMakesRowNonExistentForReadByTimestamp() {
RowId rowId = insert(binaryRow, newTransactionId());
abortWrite(rowId);
BinaryRow foundRow = read(rowId, clock.now());
assertThat(foundRow, is(nullValue()));
}
@Test
void abortOfInsertMakesRowNonExistentForReadWithTxId() {
RowId rowId = new RowId(PARTITION_ID);
BinaryRow foundRow = read(rowId, HybridTimestamp.MAX_VALUE);
assertThat(foundRow, is(nullValue()));
}
@Test
void abortWriteReturnsTheRemovedVersion() {
RowId rowId = insert(binaryRow, txId);
BinaryRow returnedRow = abortWrite(rowId);
assertThat(returnedRow, is(equalToRow(binaryRow)));
}
@Test
void readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
RowId rowId = commitAbortAndAddUncommitted();
ReadResult foundResult = storage.read(rowId, clock.now());
// We see the uncommitted row.
assertThat(foundResult.rowId(), is(rowId));
assertThat(foundResult.binaryRow(), is(equalToRow(binaryRow3)));
}
@Test
void readByTimestampBeforeAndAfterUncommittedWrite() {
RowId rowId = new RowId(PARTITION_ID);
HybridTimestamp commitTs = clock.now();
storage.runConsistently(locker -> {
addWrite(rowId, binaryRow, txId);
commitWrite(rowId, commitTs);
return null;
});
UUID txId2 = UUID.randomUUID();
storage.runConsistently(locker -> {
addWrite(rowId, binaryRow2, txId2);
return null;
});
ReadResult res = storage.read(rowId, commitTs);
assertNotNull(res);
assertNull(res.transactionId());
assertNull(res.commitTableId());
assertEquals(ReadResult.UNDEFINED_COMMIT_PARTITION_ID, res.commitPartitionId());
assertThat(res.binaryRow(), is(equalToRow(binaryRow)));
res = storage.read(rowId, clock.now());
assertNotNull(res);
assertEquals(txId2, res.transactionId());
assertEquals(COMMIT_TABLE_ID, res.commitTableId());
assertEquals(PARTITION_ID, res.commitPartitionId());
assertThat(res.binaryRow(), is(equalToRow(binaryRow2)));
}
private RowId commitAbortAndAddUncommitted() {
return storage.runConsistently(locker -> {
RowId rowId = new RowId(PARTITION_ID);
locker.lock(rowId);
storage.addWrite(rowId, binaryRow, txId, 999, 0);
commitWrite(rowId, clock.now());
addWrite(rowId, binaryRow2, newTransactionId());
storage.abortWrite(rowId);
addWrite(rowId, binaryRow3, newTransactionId());
return rowId;
});
}
@ParameterizedTest
@EnumSource(ScanTimestampProvider.class)
void scanWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite(ScanTimestampProvider tsProvider) {
RowId rowId = commitAbortAndAddUncommitted();
try (Cursor<ReadResult> cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
ReadResult result = cursor.next();
assertThat(result.rowId(), is(rowId));
assertThat(result.binaryRow(), is(equalToRow(binaryRow3)));
assertFalse(cursor.hasNext());
}
}
@Test
void readByTimestampWorksCorrectlyIfNoUncommittedValueExists() {
RowId rowId = insert(binaryRow, txId);
BinaryRow foundRow = read(rowId, clock.now());
assertThat(foundRow, is(equalToRow(binaryRow)));
}
/**
* Tests that changed {@link MvPartitionStorage#lastAppliedIndex()} can be successfully read back.
*/
@Test
void testAppliedIndex() {
storage.runConsistently(locker -> {
assertEquals(0, storage.lastAppliedIndex());
assertEquals(0, storage.lastAppliedTerm());
storage.lastApplied(1, 1);
assertEquals(1, storage.lastAppliedIndex());
assertEquals(1, storage.lastAppliedTerm());
return null;
});
CompletableFuture<Void> flushFuture = storage.flush();
assertThat(flushFuture, willCompleteSuccessfully());
}
@Test
void testReadWithinBeforeAndAfterTwoCommits() {
HybridTimestamp before = clock.now();
RowId rowId = new RowId(PARTITION_ID);
HybridTimestamp first = clock.now();
storage.runConsistently(locker -> {
addWrite(rowId, binaryRow, newTransactionId());
commitWrite(rowId, first);
return null;
});
HybridTimestamp betweenCommits = clock.now();
HybridTimestamp second = clock.now();
storage.runConsistently(locker -> {
addWrite(rowId, binaryRow2, newTransactionId());
commitWrite(rowId, second);
return null;
});
storage.runConsistently(locker -> {
addWrite(rowId, binaryRow3, newTransactionId());
return null;
});
HybridTimestamp after = clock.now();
// Read before commits.
ReadResult res = storage.read(rowId, before);
assertNull(res.binaryRow());
// Read at exact time of first commit.
res = storage.read(rowId, first);
assertNotNull(res);
assertNull(res.newestCommitTimestamp());
assertThat(res.binaryRow(), is(equalToRow(binaryRow)));
// Read between two commits.
res = storage.read(rowId, betweenCommits);
assertNotNull(res);
assertNull(res.newestCommitTimestamp());
assertThat(res.binaryRow(), is(equalToRow(binaryRow)));
// Read at exact time of second commit.
res = storage.read(rowId, second);
assertNotNull(res);
assertNull(res.newestCommitTimestamp());
assertThat(res.binaryRow(), is(equalToRow(binaryRow2)));
// Read after second commit (write intent).
res = storage.read(rowId, after);
assertNotNull(res);
assertNotNull(res.newestCommitTimestamp());
assertEquals(second, res.newestCommitTimestamp());
assertThat(res.binaryRow(), is(equalToRow(binaryRow3)));
}
@Test
void testWrongPartition() {
RowId rowId = commitAbortAndAddUncommitted();
var row = new RowId(rowId.partitionId() + 1, rowId.mostSignificantBits(), rowId.leastSignificantBits());
assertThrows(IllegalArgumentException.class, () -> read(row, clock.now()));
}
@Test
void testReadingNothingWithLowerRowIdIfHigherRowIdWritesExist() {
RowId rowId = commitAbortAndAddUncommitted();
RowId lowerRowId = decrement(rowId);
assertNull(read(lowerRowId, clock.now()));
}
@Test
void testReadingNothingByTxIdWithLowerRowId() {
RowId higherRowId = new RowId(PARTITION_ID);
RowId lowerRowId = decrement(higherRowId);
UUID txId = UUID.randomUUID();
storage.runConsistently(locker -> {
addWrite(higherRowId, binaryRow, txId);
return null;
});
assertNull(read(lowerRowId, HybridTimestamp.MAX_VALUE));
}
@Test
void testReadingCorrectWriteIntentByTimestampIfLowerRowIdWriteIntentExists() {
RowId higherRowId = new RowId(PARTITION_ID);
RowId lowerRowId = decrement(higherRowId);
storage.runConsistently(locker -> {
addWrite(lowerRowId, binaryRow2, newTransactionId());
addWrite(higherRowId, binaryRow, newTransactionId());
commitWrite(higherRowId, clock.now());
return null;
});
assertThat(read(higherRowId, clock.now()), is(equalToRow(binaryRow)));
}
@Test
void testReadingCorrectWriteIntentByTimestampIfHigherRowIdWriteIntentExists() {
RowId higherRowId = new RowId(PARTITION_ID);
RowId lowerRowId = decrement(higherRowId);
storage.runConsistently(locker -> {
addWrite(lowerRowId, binaryRow, newTransactionId());
addWrite(higherRowId, binaryRow2, newTransactionId());
return null;
});
assertThat(read(lowerRowId, clock.now()), is(equalToRow(binaryRow)));
}
@Test
void testReadingTombstoneIfPreviousCommitExists() {
RowId rowId = new RowId(PARTITION_ID);
HybridTimestamp commitTs = clock.now();
storage.runConsistently(locker -> {
addWrite(rowId, binaryRow, newTransactionId());
commitWrite(rowId, commitTs);
addWrite(rowId, null, newTransactionId());
return null;
});
ReadResult res = storage.read(rowId, clock.now());
assertNotNull(res);
assertNull(res.binaryRow());
assertEquals(commitTs, res.newestCommitTimestamp());
}
@Test
void testReadingTombstoneIfPreviousCommitNotExists() {
RowId rowId = new RowId(PARTITION_ID);
storage.runConsistently(locker -> {
addWrite(rowId, null, newTransactionId());
return null;
});
ReadResult res = storage.read(rowId, clock.now());
assertNotNull(res);
assertNull(res.binaryRow());
assertNull(res.newestCommitTimestamp());
}
@Test
void testScanVersions() {
RowId rowId = new RowId(PARTITION_ID, 100, 0);
// Populate storage with several versions for the same row id.
List<TestValue> values = new ArrayList<>();
values.add(value);
values.add(value2);
values.add(null); // A tombstone.
for (TestValue value : values) {
BinaryRow row = value == null ? null : binaryRow(key, value);
addWrite(rowId, row, newTransactionId());
commitWrite(rowId, clock.now());
}
// Put rows before and after.
RowId lowRowId = new RowId(PARTITION_ID, 99, 0);
RowId highRowId = new RowId(PARTITION_ID, 101, 0);
List.of(lowRowId, highRowId).forEach(newRowId -> {
addWrite(newRowId, binaryRow(key, value), newTransactionId());
commitWrite(newRowId, clock.now());
});
// Reverse expected values to simplify comparison - they are returned in reversed order, newest to oldest.
Collections.reverse(values);
List<IgniteBiTuple<TestKey, TestValue>> list = storage.runConsistently(locker -> {
locker.lock(rowId);
return drainVerifyingRowIdMatch(storage.scanVersions(rowId), rowId);
});
assertEquals(values.size(), list.size());
for (int i = 0; i < list.size(); i++) {
IgniteBiTuple<TestKey, TestValue> kv = list.get(i);
if (kv == null) {
assertThat(values.get(i), is(nullValue()));
} else {
assertThat(key, is(kv.getKey()));
assertThat(values.get(i), is(kv.getValue()));
}
}
}
private static List<IgniteBiTuple<TestKey, TestValue>> drainVerifyingRowIdMatch(Cursor<ReadResult> cursor, RowId rowId) {
try (cursor) {
return cursor.stream()
.peek(result -> assertThat(result.rowId(), is(rowId)))
.map(BaseMvStoragesTest::unwrap)
.collect(toList());
}
}
@ParameterizedTest
@EnumSource(ScanTimestampProvider.class)
void testScanWithWriteIntent(ScanTimestampProvider tsProvider) {
IgniteBiTuple<RowId, HybridTimestamp> rowIdAndCommitTs = addCommittedVersionAndWriteIntent();
try (PartitionTimestampCursor cursor = storage.scan(tsProvider.scanTimestamp(clock))) {
assertTrue(cursor.hasNext());
ReadResult next = cursor.next();
assertTrue(next.isWriteIntent());
assertThat(next.rowId(), is(rowIdAndCommitTs.get1()));
assertThat(next.binaryRow(), is(equalToRow(binaryRow2)));
BinaryRow committedRow = cursor.committed(rowIdAndCommitTs.get2());
assertThat(committedRow, is(equalToRow(binaryRow)));
}
}
private IgniteBiTuple<RowId, HybridTimestamp> addCommittedVersionAndWriteIntent() {
RowId rowId = new RowId(PARTITION_ID);
HybridTimestamp commitTs = clock.now();
storage.runConsistently(locker -> {
addWrite(rowId, binaryRow, newTransactionId());
commitWrite(rowId, commitTs);
addWrite(rowId, binaryRow2, newTransactionId());
return null;
});
return new IgniteBiTuple<>(rowId, commitTs);
}
@Test
void testScanVersionsWithWriteIntent() {
RowId rowId = new RowId(PARTITION_ID, 100, 0);
addWrite(rowId, binaryRow(key, value), newTransactionId());
commitWrite(rowId, clock.now());
addWrite(rowId, binaryRow(key, value2), newTransactionId());
// Put rows before and after.
RowId lowRowId = new RowId(PARTITION_ID, 99, 0);
RowId highRowId = new RowId(PARTITION_ID, 101, 0);
List.of(lowRowId, highRowId).forEach(newRowId -> {
addWrite(newRowId, binaryRow(key, value), newTransactionId());
commitWrite(newRowId, clock.now());
});
List<IgniteBiTuple<TestKey, TestValue>> list = storage.runConsistently(locker -> {
locker.lock(rowId);
return drainVerifyingRowIdMatch(storage.scanVersions(rowId), rowId);
});
assertEquals(2, list.size());
for (IgniteBiTuple<TestKey, TestValue> objects : list) {
assertEquals(key, objects.getKey());
}
assertEquals(value2, list.get(0).getValue());
assertEquals(value, list.get(1).getValue());
}
@Test
void testClosestRowId() {
RowId rowId0 = new RowId(PARTITION_ID, 1, -1);
RowId rowId1 = new RowId(PARTITION_ID, 1, 0);
RowId rowId2 = new RowId(PARTITION_ID, 1, 1);
addWrite(rowId1, binaryRow, txId);
addWrite(rowId2, binaryRow2, txId);
assertEquals(rowId1, storage.closestRowId(rowId0));
assertEquals(rowId1, storage.closestRowId(rowId0.increment()));
assertEquals(rowId1, storage.closestRowId(rowId1));
assertEquals(rowId2, storage.closestRowId(rowId2));
assertNull(storage.closestRowId(rowId2.increment()));
}
@Test
public void addWriteCommittedAddsCommittedVersion() {
RowId rowId = new RowId(PARTITION_ID);
addWriteCommitted(rowId, binaryRow, clock.now());
// Read with timestamp returns write-intent.
assertThat(storage.read(rowId, clock.now()).binaryRow(), is(equalToRow(binaryRow)));
}
@Test
public void addWriteCommittedLeavesExistingCommittedVersionsUntouched() {
RowId rowId = new RowId(PARTITION_ID);
HybridTimestamp ts1 = clock.now();
addWriteCommitted(rowId, binaryRow, ts1);
addWriteCommitted(rowId, binaryRow2, clock.now());
assertThat(storage.read(rowId, clock.now()).binaryRow(), is(equalToRow(binaryRow2)));
assertThat(storage.read(rowId, ts1).binaryRow(), is(equalToRow(binaryRow)));
}
@Test
public void addWriteCommittedThrowsIfUncommittedVersionExists() {
RowId rowId = insert(binaryRow, txId);
StorageException ex = assertThrows(StorageException.class, () -> addWriteCommitted(rowId, binaryRow2, clock.now()));
assertThat(ex.getMessage(), allOf(containsString("Write intent exists"), containsString(rowId.toString())));
}
@Test
public void scanVersionsReturnsUncommittedVersionsAsUncommitted() {
RowId rowId = insert(binaryRow, txId);
commitWrite(rowId, clock.now());
addWrite(rowId, binaryRow2, newTransactionId());
storage.runConsistently(locker -> {
locker.lock(rowId);
try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
ReadResult result = cursor.next();
assertThat(result.rowId(), is(rowId));
assertTrue(result.isWriteIntent());
assertThat(result.commitPartitionId(), is(not(ReadResult.UNDEFINED_COMMIT_PARTITION_ID)));
assertThat(result.commitTableId(), is(notNullValue()));
assertThat(result.transactionId(), is(notNullValue()));
assertThat(result.commitTimestamp(), is(nullValue()));
assertThat(result.newestCommitTimestamp(), is(nullValue()));
}
return null;
});
}
@Test
public void scanVersionsReturnsCommittedVersionsAsCommitted() {
RowId rowId = insert(binaryRow, txId);
commitWrite(rowId, clock.now());
storage.runConsistently(locker -> {
locker.lock(rowId);
try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
ReadResult result = cursor.next();
assertThat(result.rowId(), is(rowId));
assertFalse(result.isWriteIntent());
assertThat(result.commitPartitionId(), is(ReadResult.UNDEFINED_COMMIT_PARTITION_ID));
assertThat(result.commitTableId(), is(nullValue()));
assertThat(result.transactionId(), is(nullValue()));
assertThat(result.commitTimestamp(), is(notNullValue()));
assertThat(result.newestCommitTimestamp(), is(nullValue()));
}
return null;
});
}
@ParameterizedTest
@EnumSource(ScanTimestampProvider.class)
public void scanCursorHasNextReturnsFalseEachTimeAfterExhaustion(ScanTimestampProvider tsProvider) {
RowId rowId = insert(binaryRow, txId);
commitWrite(rowId, clock.now());
try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
cursor.next();
assertFalse(cursor.hasNext());
//noinspection ConstantConditions
assertFalse(cursor.hasNext());
}
}
@ParameterizedTest
@EnumSource(ScanTimestampProvider.class)
public void scanSeesTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) {
RowId rowId = insert(binaryRow, txId);
HybridTimestamp commitTs = clock.now();
commitWrite(rowId, commitTs);
addWrite(rowId, null, newTransactionId());
try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
assertTrue(cursor.hasNext());
ReadResult next = cursor.next();
assertThat(next.rowId(), is(rowId));
assertNull(next.binaryRow());
assertEquals(commitTs, next.newestCommitTimestamp());
assertFalse(cursor.hasNext());
}
}
@ParameterizedTest
@EnumSource(ScanTimestampProvider.class)
public void scanDoesNotSeeTombstonesWhenTombstoneIsCommitted(ScanTimestampProvider tsProvider) {
RowId rowId = insert(binaryRow, txId);
commitWrite(rowId, clock.now());
addWrite(rowId, null, newTransactionId());
commitWrite(rowId, clock.now());
try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) {
assertFalse(cursor.hasNext());
}
}
@ParameterizedTest
@EnumSource(ScanTimestampProvider.class)
void committedMethodCallDoesNotInterfereWithIteratingOverScanCursor(ScanTimestampProvider scanTsProvider) {
RowId rowId1 = new RowId(PARTITION_ID, 0, 0);
addWrite(rowId1, binaryRow, txId);
HybridTimestamp commitTs1 = clock.now();
commitWrite(rowId1, commitTs1);
addWrite(new RowId(PARTITION_ID, 0, 1), binaryRow2, txId);
try (PartitionTimestampCursor cursor = scan(scanTsProvider.scanTimestamp(clock))) {
cursor.next();
cursor.committed(commitTs1);
ReadResult result2 = cursor.next();
assertThat(result2.binaryRow(), is(equalToRow(binaryRow2)));
assertFalse(cursor.hasNext());
}
}
@Test
void groupConfigurationOnEmptyStorageIsNull() {
assertThat(storage.committedGroupConfiguration(), is(nullValue()));
}
@Test
void groupConfigurationIsSaved() {
byte[] configToSave = {1, 2, 3};
storage.runConsistently(locker -> {
storage.committedGroupConfiguration(configToSave);
return null;
});
byte[] returnedConfig = storage.committedGroupConfiguration();
assertThat(returnedConfig, is(notNullValue()));
assertThat(returnedConfig, is(equalTo(configToSave)));
}
@Test
void groupConfigurationIsUpdated() {
byte[] firstConfig = {1, 2, 3};
storage.runConsistently(locker -> {
storage.committedGroupConfiguration(firstConfig);
return null;
});
byte[] secondConfig = {3, 2, 1};
storage.runConsistently(locker -> {
storage.committedGroupConfiguration(secondConfig);
return null;
});
byte[] returnedConfig = storage.committedGroupConfiguration();
assertThat(returnedConfig, is(notNullValue()));
assertThat(returnedConfig, is(equalTo(secondConfig)));
}
@Test
public void testLease() {
storage.runConsistently(locker -> {
long lst0 = 1000;
long lst1 = 2000;
storage.updateLease(lst0);
assertEquals(lst0, storage.leaseStartTime());
storage.updateLease(lst1);
assertEquals(lst1, storage.leaseStartTime());
storage.updateLease(0);
assertEquals(lst1, storage.leaseStartTime());
return null;
});
}
/**
* Returns row id that is lexicographically smaller (by the value of one) than the argument.
*
* @param value Row id.
* @return Row id value minus 1.
*/
private RowId decrement(RowId value) {
long msb = value.mostSignificantBits();
long lsb = value.leastSignificantBits();
if (--lsb == Long.MAX_VALUE) {
if (--msb == Long.MAX_VALUE) {
throw new IllegalArgumentException();
}
}
return new RowId(value.partitionId(), msb, lsb);
}
private enum ScanTimestampProvider {
NOW {
@Override
HybridTimestamp scanTimestamp(HybridClock clock) {
return clock.now();
}
},
MAX_VALUE {
@Override
HybridTimestamp scanTimestamp(HybridClock clock) {
return HybridTimestamp.MAX_VALUE;
}
};
abstract HybridTimestamp scanTimestamp(HybridClock clock);
}
}