blob: 977f2ad42788c8d50ef977814c6bd4df9815e7c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table;
import static java.util.concurrent.CompletableFuture.allOf;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
/**
* TODO asch IGNITE-15928 validate zero locks after test finish.
*/
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public abstract class TxAbstractTest extends IgniteAbstractTest {
protected static SchemaDescriptor ACCOUNTS_SCHEMA = new SchemaDescriptor(
1,
new Column[]{new Column("accountNumber", NativeTypes.INT64, false)},
new Column[]{new Column("balance", NativeTypes.DOUBLE, false)}
);
/** Table ID test value. */
public static final UUID tableId2 = java.util.UUID.randomUUID();
protected static SchemaDescriptor CUSTOMERS_SCHEMA = new SchemaDescriptor(
1,
new Column[]{new Column("accountNumber", NativeTypes.INT64, false)},
new Column[]{new Column("name", NativeTypes.STRING, false)}
);
/** Accounts table id -> balance. */
protected Table accounts;
/** Customers table id -> name. */
protected Table customers;
protected static final double BALANCE_1 = 500;
protected static final double BALANCE_2 = 500;
protected static final double DELTA = 100;
protected IgniteTransactions igniteTransactions;
/**
* Initialize the test state.
*/
@BeforeEach
public abstract void before() throws Exception;
@Test
public void testMixedPutGet() throws TransactionException {
accounts.recordView().upsert(makeValue(1, BALANCE_1));
igniteTransactions.runInTransaction(tx -> {
var txAcc = accounts.recordView().withTransaction(tx);
txAcc.getAsync(makeKey(1)).thenCompose(r ->
txAcc.upsertAsync(makeValue(1, r.doubleValue("balance") + DELTA))).join();
});
assertEquals(BALANCE_1 + DELTA, accounts.recordView().get(makeKey(1)).doubleValue("balance"));
}
@Test
public void testLockOrdering() throws InterruptedException {
accounts.recordView().upsert(makeValue(1, 50.));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx3 = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx4 = (InternalTransaction) igniteTransactions.begin();
assertTrue(tx2.timestamp().compareTo(tx.timestamp()) > 0);
assertTrue(tx3.timestamp().compareTo(tx2.timestamp()) > 0);
assertTrue(tx4.timestamp().compareTo(tx3.timestamp()) > 0);
RecordView<Tuple> acc0 = accounts.recordView().withTransaction(tx);
RecordView<Tuple> acc2 = accounts.recordView().withTransaction(tx2);
RecordView<Tuple> acc3 = accounts.recordView().withTransaction(tx3);
RecordView<Tuple> acc4 = accounts.recordView().withTransaction(tx4);
acc0.upsert(makeValue(1, 100.));
CompletableFuture<Void> fut = acc3.upsertAsync(makeValue(1, 300.));
Thread.sleep(100);
assertFalse(fut.isDone());
CompletableFuture<Void> fut2 = acc4.upsertAsync(makeValue(1, 400.));
Thread.sleep(100);
assertFalse(fut2.isDone());
CompletableFuture<Void> fut3 = acc2.upsertAsync(makeValue(1, 200.));
assertFalse(fut3.isDone());
}
/**
* Tests a transaction closure.
*/
@Test
public void testTxClosure() throws TransactionException {
RecordView<Tuple> view = accounts.recordView();
view.upsert(makeValue(1, BALANCE_1));
view.upsert(makeValue(2, BALANCE_2));
igniteTransactions.runInTransaction(tx -> {
CompletableFuture<Tuple> read1 = view.getAsync(makeKey(1));
CompletableFuture<Tuple> read2 = view.getAsync(makeKey(2));
// TODO asch IGNITE-15938 must ensure a commit happens after all pending tx async ops.
view.upsertAsync(makeValue(1, read1.join().doubleValue("balance") - DELTA)).join();
view.upsertAsync(makeValue(2, read2.join().doubleValue("balance") + DELTA)).join();
});
assertEquals(BALANCE_1 - DELTA, view.get(makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, view.get(makeKey(2)).doubleValue("balance"));
assertEquals(5, txManager(accounts).finished());
}
/**
* Tests a transaction closure over key-value view.
*/
@Test
public void testTxClosureKeyValueView() throws TransactionException {
accounts.recordView().upsert(makeValue(1, BALANCE_1));
accounts.recordView().upsert(makeValue(2, BALANCE_2));
igniteTransactions.runInTransaction(tx -> {
KeyValueView<Tuple, Tuple> view = accounts.keyValueView();
CompletableFuture<Tuple> read1 = view.getAsync(makeKey(1));
CompletableFuture<Tuple> read2 = view.getAsync(makeKey(2));
view.putAsync(makeKey(1), makeValue(read1.join().doubleValue("balance") - DELTA)).join();
view.putAsync(makeKey(2), makeValue(read2.join().doubleValue("balance") + DELTA)).join();
});
assertEquals(BALANCE_1 - DELTA, accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, accounts.recordView().get(makeKey(2)).doubleValue("balance"));
assertEquals(5, txManager(accounts).finished());
}
/**
* Tests an asynchronous transaction.
*/
@Test
public void testTxAsync() {
accounts.recordView().upsert(makeValue(1, BALANCE_1));
accounts.recordView().upsert(makeValue(2, BALANCE_2));
igniteTransactions.beginAsync().thenApply(tx -> accounts.recordView().withTransaction(tx))
.thenCompose(txAcc -> txAcc.getAsync(makeKey(1))
.thenCombine(txAcc.getAsync(makeKey(2)), (v1, v2) -> new Pair<>(v1, v2))
.thenCompose(pair -> allOf(
txAcc.upsertAsync(makeValue(1, pair.getFirst().doubleValue("balance") - DELTA)),
txAcc.upsertAsync(makeValue(2, pair.getSecond().doubleValue("balance") + DELTA))
)
)
.thenApply(ignored -> txAcc.transaction())
).thenCompose(Transaction::commitAsync).join();
assertEquals(BALANCE_1 - DELTA, accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, accounts.recordView().get(makeKey(2)).doubleValue("balance"));
}
/**
* Tests an asynchronous transaction over key-value view.
*/
@Test
public void testTxAsyncKeyValueView() {
accounts.recordView().upsert(makeValue(1, BALANCE_1));
accounts.recordView().upsert(makeValue(2, BALANCE_2));
igniteTransactions.beginAsync().thenApply(tx -> accounts.keyValueView().withTransaction(tx))
.thenCompose(txAcc -> txAcc.getAsync(makeKey(1))
.thenCombine(txAcc.getAsync(makeKey(2)), (v1, v2) -> new Pair<>(v1, v2))
.thenCompose(pair -> allOf(
txAcc.putAsync(makeKey(1), makeValue(pair.getFirst().doubleValue("balance") - DELTA)),
txAcc.putAsync(makeKey(2), makeValue(pair.getSecond().doubleValue("balance") + DELTA))
)
)
.thenApply(ignored -> txAcc.transaction())
).thenCompose(Transaction::commitAsync).join();
assertEquals(BALANCE_1 - DELTA, accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, accounts.recordView().get(makeKey(2)).doubleValue("balance"));
}
@Test
public void testSimpleConflict() throws Exception {
accounts.recordView().upsert(makeValue(1, 100.));
Transaction tx = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
var table = accounts.recordView().withTransaction(tx);
var table2 = accounts.recordView().withTransaction(tx2);
double val = table.get(makeKey(1)).doubleValue("balance");
table2.get(makeKey(1)).doubleValue("balance");
try {
table.upsert(makeValue(1, val + 1));
fail();
} catch (Exception e) {
// Expected.
}
table2.upsert(makeValue(1, val + 1));
tx2.commit();
try {
tx.commit();
fail();
} catch (TransactionException e) {
// Expected.
}
assertEquals(101., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
}
@Test
public void testCommit() throws TransactionException {
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
Tuple key = makeKey(1);
var table = accounts.recordView().withTransaction(tx);
table.upsert(makeValue(1, 100.));
assertEquals(100., table.get(key).doubleValue("balance"));
table.upsert(makeValue(1, 200.));
assertEquals(200., table.get(key).doubleValue("balance"));
tx.commit();
assertEquals(200., accounts.recordView().get(key).doubleValue("balance"));
assertEquals(COMMITED, txManager(accounts).state(tx.timestamp()));
}
@Test
public void testAbort() throws TransactionException {
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
Tuple key = makeKey(1);
var table = accounts.recordView().withTransaction(tx);
table.upsert(makeValue(1, 100.));
assertEquals(100., table.get(key).doubleValue("balance"));
table.upsert(makeValue(1, 200.));
assertEquals(200., table.get(key).doubleValue("balance"));
tx.rollback();
assertNull(accounts.recordView().get(key));
assertEquals(ABORTED, txManager(accounts).state(tx.timestamp()));
}
@Test
public void testAbortNoUpdate() throws TransactionException {
accounts.recordView().upsert(makeValue(1, 100.));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
tx.rollback();
assertEquals(100., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
}
@Test
public void testConcurrent() throws TransactionException {
Transaction tx = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
Tuple key = makeKey(1);
Tuple val = makeValue(1, 100.);
accounts.recordView().upsert(val);
var table = accounts.recordView().withTransaction(tx);
var table2 = accounts.recordView().withTransaction(tx2);
assertEquals(100., table.get(key).doubleValue("balance"));
assertEquals(100., table2.get(key).doubleValue("balance"));
tx.commit();
tx2.commit();
}
/**
* Tests if a lost update is not happening on concurrent increment.
*/
@Test
public void testIncrement() throws TransactionException {
Transaction tx = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
Tuple key = makeKey(1);
Tuple val = makeValue(1, 100.);
accounts.recordView().upsert(val); // Creates implicit transaction.
var table = accounts.recordView().withTransaction(tx);
var table2 = accounts.recordView().withTransaction(tx2);
// Read in tx
double valTx = table.get(key).doubleValue("balance");
// Read in tx2
double valTx2 = table2.get(key).doubleValue("balance");
// Write in tx (out of order)
// TODO asch IGNITE-15937 fix exception model.
Exception err = assertThrows(Exception.class, () -> table.upsert(makeValue(1, valTx + 1)));
assertTrue(err.getMessage().contains("Failed to acquire a lock"), err.getMessage());
// Write in tx2
table2.upsert(makeValue(1, valTx2 + 1));
tx2.commit();
assertEquals(101., accounts.recordView().get(key).doubleValue("balance"));
}
/**
* Tests if a lost update is not happening on concurrent increment.
*/
@Test
public void testIncrement2() throws TransactionException {
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
Tuple key = makeKey(1);
Tuple val = makeValue(1, 100.);
accounts.recordView().upsert(val); // Creates implicit transaction.
var table = accounts.recordView().withTransaction(tx);
var table2 = accounts.recordView().withTransaction(tx2);
// Read in tx
double valTx = table.get(key).doubleValue("balance");
// Read in tx2
double valTx2 = table2.get(key).doubleValue("balance");
// Write in tx2 (should wait for read unlock in tx1)
CompletableFuture<Void> fut = table2.upsertAsync(makeValue(1, valTx2 + 1));
assertFalse(fut.isDone());
CompletableFuture<Void> fut2 = fut.thenCompose(ret -> tx2.commitAsync());
// Write in tx
table.upsert(makeValue(1, valTx + 1));
tx.commit();
Exception err = assertThrows(Exception.class, () -> fut2.get(5, TimeUnit.SECONDS));
assertTrue(err.getMessage().contains("Failed to acquire a lock"), err.getMessage());
assertEquals(101., accounts.recordView().get(key).doubleValue("balance"));
}
@Test
public void testAbortWithValue() throws TransactionException {
accounts.recordView().upsert(makeValue(0, 100.));
assertEquals(100., accounts.recordView().get(makeKey(0)).doubleValue("balance"));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
var table = accounts.recordView().withTransaction(tx);
table.upsert(makeValue(0, 200.));
assertEquals(200., table.get(makeKey(0)).doubleValue("balance"));
tx.rollback();
assertEquals(100., accounts.recordView().get(makeKey(0)).doubleValue("balance"));
}
@Test
public void testInsert() throws TransactionException {
assertNull(accounts.recordView().get(makeKey(1)));
Transaction tx = igniteTransactions.begin();
var table = accounts.recordView().withTransaction(tx);
assertTrue(table.insert(makeValue(1, 100.)));
assertFalse(table.insert(makeValue(1, 200.)));
assertEquals(100., table.get(makeKey(1)).doubleValue("balance"));
tx.commit();
assertEquals(100., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertTrue(accounts.recordView().insert(makeValue(2, 200.)));
assertEquals(200., accounts.recordView().get(makeKey(2)).doubleValue("balance"));
Transaction tx2 = igniteTransactions.begin();
table = accounts.recordView().withTransaction(tx2);
assertTrue(table.insert(makeValue(3, 100.)));
assertFalse(table.insert(makeValue(3, 200.)));
assertEquals(100., table.get(makeKey(3)).doubleValue("balance"));
tx2.rollback();
assertEquals(100., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertEquals(200., accounts.recordView().get(makeKey(2)).doubleValue("balance"));
assertNull(accounts.recordView().get(makeKey(3)));
}
@Test
public void testDelete() throws TransactionException {
Tuple key = makeKey(1);
assertFalse(accounts.recordView().delete(key));
assertNull(accounts.recordView().get(key));
accounts.recordView().upsert(makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertNotNull(accounts.recordView().get(key));
assertTrue(accounts.recordView().delete(key));
assertNull(accounts.recordView().get(key));
});
assertNull(accounts.recordView().get(key));
accounts.recordView().upsert(makeValue(1, 100.));
assertNotNull(accounts.recordView().get(key));
Tuple key2 = makeKey(2);
accounts.recordView().upsert(makeValue(2, 100.));
assertThrows(RuntimeException.class, () -> igniteTransactions.runInTransaction((Consumer<Transaction>) tx -> {
assertNotNull(accounts.recordView().get(key2));
assertTrue(accounts.recordView().delete(key2));
assertNull(accounts.recordView().get(key2));
throw new RuntimeException(); // Triggers rollback.
}));
assertNotNull(accounts.recordView().get(key2));
assertTrue(accounts.recordView().delete(key2));
assertNull(accounts.recordView().get(key2));
}
@Test
public void testGetAll() {
List<Tuple> keys = List.of(makeKey(1), makeKey(2));
Collection<Tuple> ret = accounts.recordView().getAll(keys);
assertEquals(2, ret.size());
for (Tuple tuple : ret) {
assertNull(tuple);
}
accounts.recordView().upsert(makeValue(1, 100.));
accounts.recordView().upsert(makeValue(2, 200.));
ret = new ArrayList<>(accounts.recordView().getAll(keys));
validateBalance(ret, 100., 200.);
}
/**
* Tests if a transaction is rolled back if one of the batch keys can't be locked.
*/
@Test
public void testGetAllAbort() throws TransactionException {
List<Tuple> keys = List.of(makeKey(1), makeKey(2));
accounts.recordView().upsertAll(List.of(makeValue(1, 100.), makeValue(2, 200.)));
Transaction tx = igniteTransactions.begin();
RecordView<Tuple> txAcc = accounts.recordView().withTransaction(tx);
txAcc.upsert(makeValue(1, 300.));
validateBalance(txAcc.getAll(keys), 300., 200.);
tx.rollback();
validateBalance(accounts.recordView().getAll(keys), 100., 200.);
}
/**
* Tests if a transaction is rolled back if one of the batch keys can't be locked.
*/
@Test
public void testGetAllConflict() throws Exception {
accounts.recordView().upsertAll(List.of(makeValue(1, 100.), makeValue(2, 200.)));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
RecordView<Tuple> txAcc = accounts.recordView().withTransaction(tx);
RecordView<Tuple> txAcc2 = accounts.recordView().withTransaction(tx2);
txAcc2.upsert(makeValue(1, 300.));
txAcc.upsert(makeValue(2, 400.));
Exception err = assertThrows(Exception.class, () -> txAcc.getAll(List.of(makeKey(2), makeKey(1))));
assertTrue(err.getMessage().contains("Failed to acquire a lock"), err.getMessage());
validateBalance(txAcc2.getAll(List.of(makeKey(2), makeKey(1))), 200., 300.);
validateBalance(txAcc2.getAll(List.of(makeKey(1), makeKey(2))), 300., 200.);
assertTrue(IgniteTestUtils.waitForCondition(() -> TxState.ABORTED == tx.state(), 5_000), tx.state().toString());
tx2.commit();
validateBalance(accounts.recordView().getAll(List.of(makeKey(2), makeKey(1))), 200., 300.);
}
@Test
public void testPutAll() throws TransactionException {
igniteTransactions.runInTransaction(tx -> {
accounts.recordView().upsertAll(List.of(makeValue(1, 100.), makeValue(2, 200.)));
});
validateBalance(accounts.recordView().getAll(List.of(makeKey(2), makeKey(1))), 200., 100.);
assertThrows(IgniteException.class, () -> igniteTransactions.runInTransaction(tx -> {
accounts.recordView().upsertAll(List.of(makeValue(3, 300.), makeValue(4, 400.)));
if (true) {
throw new IgniteException();
}
}));
assertNull(accounts.recordView().get(makeKey(3)));
assertNull(accounts.recordView().get(makeKey(4)));
}
@Test
public void testInsertAll() throws TransactionException {
accounts.recordView().upsertAll(List.of(makeValue(1, 100.), makeValue(2, 200.)));
igniteTransactions.runInTransaction(tx -> {
Collection<Tuple> res = accounts.recordView().insertAll(List.of(makeValue(1, 200.), makeValue(3, 300.)));
assertEquals(1, res.size());
});
validateBalance(accounts.recordView().getAll(List.of(makeKey(1), makeKey(2), makeKey(3))), 100., 200., 300.);
}
@Test
public void testDeleteAll() throws TransactionException {
accounts.recordView().upsertAll(List.of(makeValue(1, 100.), makeValue(2, 200.)));
igniteTransactions.runInTransaction(tx -> {
Collection<Tuple> res = accounts.recordView().deleteAll(List.of(makeKey(1), makeKey(2), makeKey(3)));
assertEquals(1, res.size());
});
assertNull(accounts.recordView().get(makeKey(1)));
assertNull(accounts.recordView().get(makeKey(2)));
}
@Test
public void testReplace() throws TransactionException {
accounts.recordView().upsert(makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertFalse(accounts.recordView().replace(makeValue(2, 200.)));
assertTrue(accounts.recordView().replace(makeValue(1, 200.)));
});
validateBalance(accounts.recordView().getAll(List.of(makeKey(1))), 200.);
}
@Test
public void testGetAndReplace() throws TransactionException {
accounts.recordView().upsert(makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertNull(accounts.recordView().getAndReplace(makeValue(2, 200.)));
assertNotNull(accounts.recordView().getAndReplace(makeValue(1, 200.)));
});
validateBalance(accounts.recordView().getAll(List.of(makeKey(1))), 200.);
}
@Test
public void testDeleteExact() throws TransactionException {
accounts.recordView().upsert(makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertFalse(accounts.recordView().deleteExact(makeValue(1, 200.)));
assertTrue(accounts.recordView().deleteExact(makeValue(1, 100.)));
});
Tuple actual = accounts.recordView().get(makeKey(1));
assertNull(actual);
}
@Test
public void testDeleteAllExact() throws TransactionException {
accounts.recordView().upsertAll(List.of(makeValue(1, 100.), makeValue(2, 200.)));
igniteTransactions.runInTransaction(tx -> {
Collection<Tuple> res =
accounts.recordView().deleteAllExact(List.of(makeValue(1, 200.), makeValue(2, 200.), makeValue(3, 300.)));
assertEquals(2, res.size());
});
assertNotNull(accounts.recordView().get(makeKey(1)));
assertNull(accounts.recordView().get(makeKey(2)));
}
@Test
public void testGetAndPut() throws TransactionException {
accounts.recordView().upsert(makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertNotNull(accounts.recordView().getAndUpsert(makeValue(1, 200.)));
assertNull(accounts.recordView().getAndUpsert(makeValue(2, 200.)));
});
validateBalance(accounts.recordView().getAll(List.of(makeKey(1), makeKey(2))), 200., 200.);
}
@Test
public void testGetAndDelete() throws TransactionException {
accounts.recordView().upsert(makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertEquals(100., accounts.recordView().getAndDelete(makeKey(1)).doubleValue("balance"));
});
assertNull(accounts.recordView().get(makeKey(1)));
}
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-15939")
public void testRollbackUpgradedLock() throws Exception { // TODO asch IGNITE-15939
accounts.recordView().upsert(makeValue(1, 100.));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
var table = accounts.recordView().withTransaction(tx);
var table2 = accounts.recordView().withTransaction(tx2);
double v0 = table.get(makeKey(1)).doubleValue("balance");
double v1 = table2.get(makeKey(1)).doubleValue("balance");
assertEquals(v0, v1);
// Try to upgrade a lock.
table2.upsertAsync(makeValue(1, v0 + 10));
Thread.sleep(300); // Give some time to update lock queue TODO asch IGNITE-15928
tx2.rollback();
}
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-15938") // TODO asch IGNITE-15938
public void testUpgradedLockInvalidation() throws Exception {
accounts.recordView().upsert(makeValue(1, 100.));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
var table = accounts.recordView().withTransaction(tx);
var table2 = accounts.recordView().withTransaction(tx2);
double v0 = table.get(makeKey(1)).doubleValue("balance");
double v1 = table2.get(makeKey(1)).doubleValue("balance");
assertEquals(v0, v1);
// Try to upgrade a lock.
table2.upsertAsync(makeValue(1, v0 + 10));
Thread.sleep(300); // Give some time to update lock queue TODO asch IGNITE-15928
table.upsert(makeValue(1, v0 + 20));
tx.commit();
assertThrows(Exception.class, () -> tx2.commit());
}
@Test
public void testReorder() throws Exception {
accounts.recordView().upsert(makeValue(1, 100.));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx3 = (InternalTransaction) igniteTransactions.begin();
var table = accounts.recordView().withTransaction(tx);
var table2 = accounts.recordView().withTransaction(tx2);
var table3 = accounts.recordView().withTransaction(tx3);
double v0 = table.get(makeKey(1)).doubleValue("balance");
double v1 = table3.get(makeKey(1)).doubleValue("balance");
assertEquals(v0, v1);
CompletableFuture<Void> fut = table3.upsertAsync(makeValue(1, v0 + 10));
assertFalse(fut.isDone());
Thread.sleep(300); // Give some time to update lock queue TODO asch IGNITE-15928
table.upsert(makeValue(1, v0 + 20));
CompletableFuture<Tuple> fut2 = table2.getAsync(makeKey(1));
assertFalse(fut2.isDone());
tx.commit();
fut2.get();
tx2.rollback();
Exception err = assertThrows(Exception.class, () -> fut.get(5, TimeUnit.SECONDS));
assertTrue(err.getMessage().contains("Failed to acquire a lock"), err.getMessage());
}
@Test
public void testReorder2() throws Exception {
accounts.recordView().upsert(makeValue(1, 100.));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx3 = (InternalTransaction) igniteTransactions.begin();
var table = accounts.recordView().withTransaction(tx);
var table2 = accounts.recordView().withTransaction(tx2);
var table3 = accounts.recordView().withTransaction(tx3);
double v0 = table.get(makeKey(1)).doubleValue("balance");
table.upsertAsync(makeValue(1, v0 + 10));
CompletableFuture<Tuple> fut = table2.getAsync(makeKey(1));
assertFalse(fut.isDone());
CompletableFuture<Tuple> fut2 = table3.getAsync(makeKey(1));
assertFalse(fut2.isDone());
}
@Test
public void testCrossTable() throws TransactionException {
customers.recordView().upsert(makeValue(1, "test"));
accounts.recordView().upsert(makeValue(1, 100.));
assertEquals("test", customers.recordView().get(makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
var txCust = customers.recordView().withTransaction(tx);
var txAcc = accounts.recordView().withTransaction(tx);
txCust.upsert(makeValue(1, "test2"));
txAcc.upsert(makeValue(1, 200.));
Tuple txValCust = txCust.get(makeKey(1));
assertEquals("test2", txValCust.stringValue("name"));
txValCust.set("accountNumber", 2L);
txValCust.set("name", "test3");
Tuple txValAcc = txAcc.get(makeKey(1));
assertEquals(200., txValAcc.doubleValue("balance"));
txValAcc.set("accountNumber", 2L);
txValAcc.set("balance", 300.);
tx.commit();
assertEquals("test2", customers.recordView().get(makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertTrue(lockManager(accounts).isEmpty());
assertTrue(lockManager(customers).isEmpty());
}
@Test
public void testTwoTables() throws TransactionException {
customers.recordView().upsert(makeValue(1, "test"));
accounts.recordView().upsert(makeValue(1, 100.));
assertEquals("test", customers.recordView().get(makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
var txCust = customers.recordView().withTransaction(tx);
var txAcc = accounts.recordView().withTransaction(tx);
txCust.upsert(makeValue(1, "test2"));
txAcc.upsert(makeValue(1, 200.));
Tuple txValCust = txCust.get(makeKey(1));
assertEquals("test2", txValCust.stringValue("name"));
txValCust.set("accountNumber", 2L);
txValCust.set("name", "test3");
Tuple txValAcc = txAcc.get(makeKey(1));
assertEquals(200., txValAcc.doubleValue("balance"));
txValAcc.set("accountNumber", 2L);
txValAcc.set("balance", 300.);
tx.commit();
tx2.commit();
assertEquals("test2", customers.recordView().get(makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertTrue(lockManager(accounts).isEmpty());
}
@Test
public void testCrossTableKeyValueView() throws TransactionException {
customers.recordView().upsert(makeValue(1L, "test"));
accounts.recordView().upsert(makeValue(1L, 100.));
assertEquals("test", customers.recordView().get(makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
var txCust = customers.keyValueView().withTransaction(tx);
var txAcc = accounts.keyValueView().withTransaction(tx);
txCust.put(makeKey(1), makeValue("test2"));
txAcc.put(makeKey(1), makeValue(200.));
Tuple txValCust = txCust.get(makeKey(1));
assertEquals("test2", txValCust.stringValue("name"));
txValCust.set("accountNumber", 2L);
txValCust.set("name", "test3");
Tuple txValAcc = txAcc.get(makeKey(1));
assertEquals(200., txValAcc.doubleValue("balance"));
txValAcc.set("accountNumber", 2L);
txValAcc.set("balance", 300.);
tx.commit();
tx2.commit();
assertEquals("test2", customers.recordView().get(makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertTrue(lockManager(accounts).isEmpty());
}
@Test
public void testCrossTableAsync() throws TransactionException {
customers.recordView().upsert(makeValue(1, "test"));
accounts.recordView().upsert(makeValue(1, 100.));
igniteTransactions.beginAsync()
.thenCompose(
tx -> accounts.recordView().withTransaction(tx).upsertAsync(makeValue(1, 200.))
.thenCombine(customers.recordView().withTransaction(tx).upsertAsync(makeValue(1, "test2")), (v1, v2) -> tx)
)
.thenCompose(Transaction::commitAsync).join();
assertEquals("test2", customers.recordView().get(makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertTrue(lockManager(accounts).isEmpty());
}
@Test
public void testCrossTableAsyncRollback() throws TransactionException {
customers.recordView().upsert(makeValue(1, "test"));
accounts.recordView().upsert(makeValue(1, 100.));
igniteTransactions.beginAsync()
.thenCompose(
tx -> accounts.recordView().withTransaction(tx).upsertAsync(makeValue(1, 200.))
.thenCombine(customers.recordView().withTransaction(tx).upsertAsync(makeValue(1, "test2")), (v1, v2) -> tx)
)
.thenCompose(Transaction::rollbackAsync).join();
assertEquals("test", customers.recordView().get(makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertTrue(lockManager(accounts).isEmpty());
}
@Test
public void testCrossTableAsyncKeyValueView() throws TransactionException {
customers.recordView().upsert(makeValue(1, "test"));
accounts.recordView().upsert(makeValue(1, 100.));
igniteTransactions.beginAsync()
.thenCompose(
tx -> accounts.keyValueView().withTransaction(tx).putAsync(makeKey(1), makeValue(200.))
.thenCombine(customers.keyValueView().withTransaction(tx).putAsync(makeKey(1), makeValue("test2")),
(v1, v2) -> tx)
)
.thenCompose(Transaction::commitAsync).join();
assertEquals("test2", customers.recordView().get(makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertTrue(lockManager(accounts).isEmpty());
}
@Test
public void testCrossTableAsyncKeyValueViewRollback() throws TransactionException {
customers.recordView().upsert(makeValue(1, "test"));
accounts.recordView().upsert(makeValue(1, 100.));
igniteTransactions.beginAsync()
.thenCompose(
tx -> accounts.keyValueView().withTransaction(tx).putAsync(makeKey(1), makeValue(200.))
.thenCombine(customers.keyValueView().withTransaction(tx).putAsync(makeKey(1), makeValue("test2")),
(v1, v2) -> tx)
)
.thenCompose(Transaction::rollbackAsync).join();
assertEquals("test", customers.recordView().get(makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(makeKey(1)).doubleValue("balance"));
assertTrue(lockManager(accounts).isEmpty());
}
@Test
public void testBalance() throws InterruptedException {
doTestSingleKeyMultithreaded(5_000, false);
}
@Test
public void testLockedTooLong() {
// TODO asch IGNITE-15936 if lock can't be acquired until timeout tx should be rolled back.
}
@Test
public void testScan() throws InterruptedException {
accounts.recordView().upsertAll(List.of(makeValue(1, 100.), makeValue(2, 200.)));
Flow.Publisher<BinaryRow> pub = ((TableImpl) accounts).internalTable().scan(0, null);
List<Tuple> rows = new ArrayList<>();
CountDownLatch l = new CountDownLatch(1);
pub.subscribe(new Flow.Subscriber<BinaryRow>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(3);
}
@Override
public void onNext(BinaryRow item) {
Row row = ((TableImpl) accounts).schemaView().resolve(item);
rows.add(TableRow.tuple(row));
}
@Override
public void onError(Throwable throwable) {
// No-op.
}
@Override
public void onComplete() {
l.countDown();
}
});
assertTrue(l.await(5_000, TimeUnit.MILLISECONDS));
Map<Long, Tuple> map = new HashMap<>();
for (Tuple row : rows) {
map.put(row.longValue("accountNumber"), row);
}
assertEquals(100., map.get(1L).doubleValue("balance"));
assertEquals(200., map.get(2L).doubleValue("balance"));
}
@Test
public void testComplexImplicit() {
doTestComplex(accounts.recordView());
}
@Test
public void testComplexExplicit() throws TransactionException {
doTestComplex(accounts.recordView().withTransaction(igniteTransactions.begin()));
}
@Test
public void testComplexImplicitKeyValueView() {
doTestComplexKeyValue(accounts.keyValueView());
}
@Test
public void testComplexExplicitKeyValueView() throws TransactionException {
doTestComplexKeyValue(accounts.keyValueView().withTransaction(igniteTransactions.begin()));
}
/**
* Checks operation over tuple record view. The scenario was moved from ITDistributedTableTest.
*
* @param view Record view.
*/
private void doTestComplex(RecordView<Tuple> view) {
final int keysCnt = 10;
long start = System.nanoTime();
for (long i = 0; i < keysCnt; i++) {
view.insert(makeValue(i, i + 2.));
}
long dur = (long) ((System.nanoTime() - start) / 1000 / 1000.);
log.info("Inserted={}, time={}ms avg={} tps={}", keysCnt, dur, dur / keysCnt, 1000 / (dur / (float) keysCnt));
for (long i = 0; i < keysCnt; i++) {
Tuple entry = view.get(makeKey(i));
assertEquals(i + 2., entry.doubleValue("balance"));
}
for (int i = 0; i < keysCnt; i++) {
view.upsert(makeValue(i, i + 5.));
Tuple entry = view.get(makeKey(i));
assertEquals(i + 5., entry.doubleValue("balance"));
}
HashSet<Tuple> keys = new HashSet<>();
for (long i = 0; i < keysCnt; i++) {
keys.add(makeKey(i));
}
Collection<Tuple> entries = view.getAll(keys);
assertEquals(keysCnt, entries.size());
for (long i = 0; i < keysCnt; i++) {
boolean res = view.replace(makeValue(i, i + 5.), makeValue(i, i + 2.));
assertTrue(res, "Failed to replace for idx=" + i);
}
for (long i = 0; i < keysCnt; i++) {
boolean res = view.delete(makeKey(i));
assertTrue(res);
Tuple entry = view.get(makeKey(i));
assertNull(entry);
}
ArrayList<Tuple> batch = new ArrayList<>(keysCnt);
for (long i = 0; i < keysCnt; i++) {
batch.add(makeValue(i, i + 2.));
}
view.upsertAll(batch);
for (long i = 0; i < keysCnt; i++) {
Tuple entry = view.get(makeKey(i));
assertEquals(i + 2., entry.doubleValue("balance"));
}
view.deleteAll(keys);
for (Tuple key : keys) {
Tuple entry = view.get(key);
assertNull(entry);
}
}
/**
* Checks operation over tuple key value view. The scenario was moved from ITDistributedTableTest.
*
* @param view Table view.
*/
public void doTestComplexKeyValue(KeyValueView<Tuple, Tuple> view) {
final int keysCnt = 10;
for (long i = 0; i < keysCnt; i++) {
view.put(makeKey(i), makeValue(i + 2.));
}
for (long i = 0; i < keysCnt; i++) {
Tuple entry = view.get(makeKey(i));
assertEquals(i + 2., entry.doubleValue("balance"));
}
for (int i = 0; i < keysCnt; i++) {
view.put(makeKey(i), makeValue(i + 5.));
Tuple entry = view.get(makeKey(i));
assertEquals(i + 5., entry.doubleValue("balance"));
}
HashSet<Tuple> keys = new HashSet<>();
for (long i = 0; i < keysCnt; i++) {
keys.add(makeKey(i));
}
Map<Tuple, Tuple> entries = view.getAll(keys);
assertEquals(keysCnt, entries.size());
for (long i = 0; i < keysCnt; i++) {
boolean res = view.replace(makeKey(i), makeValue(i + 5.), makeValue(i + 2.));
assertTrue(res, "Failed to replace for idx=" + i);
}
for (long i = 0; i < keysCnt; i++) {
boolean res = view.remove(makeKey(i));
assertTrue(res);
Tuple entry = view.get(makeKey(i));
assertNull(entry);
}
Map<Tuple, Tuple> batch = new LinkedHashMap<>(keysCnt);
for (long i = 0; i < keysCnt; i++) {
batch.put(makeKey(i), makeValue(i + 2.));
}
view.putAll(batch);
for (long i = 0; i < keysCnt; i++) {
Tuple entry = view.get(makeKey(i));
assertEquals(i + 2., entry.doubleValue("balance"));
}
view.removeAll(keys);
for (Tuple key : keys) {
Tuple entry = view.get(key);
assertNull(entry);
}
}
/**
* Performs a test.
*
* @param duration The duration.
* @param verbose Verbose mode.
* @throws InterruptedException If interrupted while waiting.
*/
private void doTestSingleKeyMultithreaded(long duration, boolean verbose) throws InterruptedException {
int threadsCnt = Runtime.getRuntime().availableProcessors() * 2;
Thread[] threads = new Thread[threadsCnt];
final double initial = 1000;
final double total = threads.length * initial;
for (int i = 0; i < threads.length; i++) {
accounts.recordView().upsert(makeValue(i, 1000));
}
double total0 = 0;
for (long i = 0; i < threads.length; i++) {
double balance = accounts.recordView().get(makeKey(i)).doubleValue("balance");
total0 += balance;
}
assertEquals(total, total0, "Total amount invariant is not preserved");
CyclicBarrier startBar = new CyclicBarrier(threads.length, () -> log.info("Before test"));
LongAdder ops = new LongAdder();
LongAdder fails = new LongAdder();
AtomicBoolean stop = new AtomicBoolean();
Random r = new Random();
AtomicReference<Throwable> firstErr = new AtomicReference<>();
for (int i = 0; i < threads.length; i++) {
long finalI = i;
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
startBar.await();
} catch (Exception e) {
fail();
}
while (!stop.get() && firstErr.get() == null) {
InternalTransaction tx = txManager(accounts).begin();
var table = accounts.recordView().withTransaction(tx);
try {
long acc1 = finalI;
double amount = 100 + r.nextInt(500);
if (verbose) {
log.info("op=tryGet ts={} id={}", tx.timestamp(), acc1);
}
double val0 = table.get(makeKey(acc1)).doubleValue("balance");
long acc2 = acc1;
while (acc1 == acc2) {
acc2 = r.nextInt(threads.length);
}
if (verbose) {
log.info("op=tryGet ts={} id={}", tx.timestamp(), acc2);
}
double val1 = table.get(makeKey(acc2)).doubleValue("balance");
if (verbose) {
log.info("op=tryPut ts={} id={}", tx.timestamp(), acc1);
}
table.upsert(makeValue(acc1, val0 - amount));
if (verbose) {
log.info("op=tryPut ts={} id={}", tx.timestamp(), acc2);
}
table.upsert(makeValue(acc2, val1 + amount));
tx.commit();
assertTrue(txManager(accounts).state(tx.timestamp()) == COMMITED);
ops.increment();
} catch (Exception e) {
assertTrue(e.getMessage().contains("Failed to acquire a lock"), e.getMessage());
fails.increment();
}
}
}
});
threads[i].setName("Worker-" + i);
threads[i].setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
firstErr.compareAndExchange(null, e);
}
});
threads[i].start();
}
Thread.sleep(duration);
stop.set(true);
for (Thread thread : threads) {
thread.join(3_000);
}
if (firstErr.get() != null) {
throw new IgniteException(firstErr.get());
}
log.info("After test ops={} fails={}", ops.sum(), fails.sum());
total0 = 0;
for (long i = 0; i < threads.length; i++) {
double balance = accounts.recordView().get(makeKey(i)).doubleValue("balance");
total0 += balance;
}
assertEquals(total, total0, "Total amount invariant is not preserved");
}
/**
* Makes a key.
*
* @param id The id.
* @return The key tuple.
*/
private Tuple makeKey(long id) {
return Tuple.create().set("accountNumber", id);
}
/**
* Makes a tuple containing key and value.
*
* @param id The id.
* @param balance The balance.
* @return The value tuple.
*/
private Tuple makeValue(long id, double balance) {
return Tuple.create().set("accountNumber", id).set("balance", balance);
}
/**
* Makes a tuple containing key and value.
*
* @param id The id.
* @param balance The balance.
* @return The value tuple.
*/
private Tuple makeValue(long id, String name) {
return Tuple.create().set("accountNumber", id).set("name", name);
}
/**
* Makes a value.
*
* @param id The id.
* @param balance The balance.
* @return The value tuple.
*/
private Tuple makeValue(double balance) {
return Tuple.create().set("balance", balance);
}
/**
* Makes a value.
*
* @param id The id.
* @param name The name.
* @return The value tuple.
*/
private Tuple makeValue(String name) {
return Tuple.create().set("name", name);
}
/**
* Get a tx manager on a partition leader.
*
* @param t The table.
* @return TX manager.
*/
protected abstract TxManager txManager(Table t);
/**
* Get a lock manager on a partition leader.
*
* @param t The table.
* @return Lock manager.
*/
protected LockManager lockManager(Table t) {
return ((TxManagerImpl) txManager(t)).getLockManager();
}
/**
* Validates table partition equality by calculating a hash code over data.
*
* @param table The table.
* @param partId Partition id.
* @return {@code True} if a replicas are the same.
*/
protected abstract boolean assertPartitionsSame(Table table, int partId);
/**
* Validates a balances.
*
* @param rows Rows.
* @param expected Expected values.
*/
private void validateBalance(Collection<Tuple> rows, double... expected) {
List<Tuple> rows0 = new ArrayList<>(rows);
assertEquals(expected.length, rows.size());
for (int i = 0; i < expected.length; i++) {
double v = expected[i];
assertEquals(v, rows0.get(i).doubleValue("balance"));
}
}
}