blob: 903b5e27f8e548f5f2d3dd29b5bd312e92f3db34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.table;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.ignite.distributed.ItTxTestCluster;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.network.ClusterService;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.configuration.StorageUpdateConfiguration;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockException;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxPriority;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.Pair;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.IgniteTransactions;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.apache.ignite.tx.TransactionOptions;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
/**
* TODO asch IGNITE-15928 validate zero locks after test commit.
*/
@ExtendWith({MockitoExtension.class, ConfigurationExtension.class})
@MockitoSettings(strictness = Strictness.LENIENT)
public abstract class TxAbstractTest extends IgniteAbstractTest {
protected static final double BALANCE_1 = 500;
protected static final double BALANCE_2 = 500;
protected static final double DELTA = 100;
protected static final String ACC_TABLE_NAME = "accounts";
protected static final String CUST_TABLE_NAME = "customers";
protected static SchemaDescriptor ACCOUNTS_SCHEMA = new SchemaDescriptor(
1,
new Column[]{new Column("accountNumber".toUpperCase(), NativeTypes.INT64, false)},
new Column[]{new Column("balance".toUpperCase(), NativeTypes.DOUBLE, false)}
);
protected static SchemaDescriptor CUSTOMERS_SCHEMA = new SchemaDescriptor(
1,
new Column[]{new Column("accountNumber".toUpperCase(), NativeTypes.INT64, false)},
new Column[]{new Column("name".toUpperCase(), NativeTypes.STRING, false)}
);
/** Accounts table id -> balance. */
protected TableViewInternal accounts;
/** Customers table id -> name. */
protected TableViewInternal customers;
protected HybridTimestampTracker timestampTracker = new HybridTimestampTracker();
protected IgniteTransactions igniteTransactions;
// TODO fsync can be turned on again after https://issues.apache.org/jira/browse/IGNITE-20195
@InjectConfiguration("mock: { fsync: false }")
protected RaftConfiguration raftConfiguration;
@InjectConfiguration
protected TransactionConfiguration txConfiguration;
@InjectConfiguration
protected StorageUpdateConfiguration storageUpdateConfiguration;
@InjectConfiguration
protected ReplicationConfiguration replicationConfiguration;
protected final TestInfo testInfo;
protected ItTxTestCluster txTestCluster;
/**
* Returns a count of nodes.
*
* @return Nodes.
*/
protected abstract int nodes();
/**
* Returns a count of replicas.
*
* @return Replicas.
*/
protected int replicas() {
return 1;
}
/**
* Returns {@code true} to disable collocation by using dedicated client node.
*
* @return {@code true} to disable collocation.
*/
protected boolean startClient() {
return true;
}
/**
* The constructor.
*
* @param testInfo Test info.
*/
public TxAbstractTest(TestInfo testInfo) {
this.testInfo = testInfo;
}
/**
* Initialize the test state.
*/
@BeforeEach
public void before() throws Exception {
txTestCluster = new ItTxTestCluster(
testInfo,
raftConfiguration,
txConfiguration,
storageUpdateConfiguration,
workDir,
nodes(),
replicas(),
startClient(),
timestampTracker,
replicationConfiguration
);
txTestCluster.prepareCluster();
this.igniteTransactions = txTestCluster.igniteTransactions();
accounts = txTestCluster.startTable(ACC_TABLE_NAME, ACCOUNTS_SCHEMA);
customers = txTestCluster.startTable(CUST_TABLE_NAME, CUSTOMERS_SCHEMA);
log.info("Tables have been started");
}
/**
* Shutdowns all cluster nodes after each test.
*
* @throws Exception If failed.
*/
@AfterEach
public void after() throws Exception {
txTestCluster.shutdownCluster();
Mockito.framework().clearInlineMocks();
}
/**
* Starts a node.
*
* @param name Node name.
* @param port Local port.
* @param nodeFinder Node finder.
* @return The client cluster view.
*/
public static ClusterService startNode(TestInfo testInfo, String name, int port,
NodeFinder nodeFinder) {
var network = ClusterServiceTestUtils.clusterService(testInfo, port, nodeFinder);
assertThat(network.startAsync(), willCompleteSuccessfully());
return network;
}
/** {@inheritDoc} */
protected TxManager clientTxManager() {
return txTestCluster.clientTxManager();
}
/** {@inheritDoc} */
protected TxManager txManager(TableViewInternal t) {
CompletableFuture<ReplicaMeta> primaryReplicaFuture = txTestCluster.placementDriver().getPrimaryReplica(
new TablePartitionId(t.tableId(), 0),
txTestCluster.clocks().get(txTestCluster.localNodeName()).now());
assertThat(primaryReplicaFuture, willCompleteSuccessfully());
TxManager manager = txTestCluster.txManagers().get(primaryReplicaFuture.join().getLeaseholder());
assertNotNull(manager);
return manager;
}
/**
* Check the storage of partition is the same across all nodes. The checking is based on {@link MvPartitionStorage#lastAppliedIndex()}
* that is increased on all update storage operation.
* TODO: IGNITE-18869 The method must be updated when a proper way to compare storages will be implemented.
*
* @param table The table.
* @param partId Partition id.
* @return True if {@link MvPartitionStorage#lastAppliedIndex()} is equivalent across all nodes, false otherwise.
*/
protected boolean assertPartitionsSame(TableViewInternal table, int partId) {
long storageIdx = 0;
for (Map.Entry<String, Loza> entry : txTestCluster.raftServers().entrySet()) {
Loza svc = entry.getValue();
var server = (JraftServerImpl) svc.server();
var groupId = new TablePartitionId(table.tableId(), partId);
Peer serverPeer = server.localPeers(groupId).get(0);
RaftGroupService grp = server.raftGroupService(new RaftNodeId(groupId, serverPeer));
var fsm = (JraftServerImpl.DelegatingStateMachine) grp.getRaftNode().getOptions().getFsm();
PartitionListener listener = (PartitionListener) fsm.getListener();
MvPartitionStorage storage = listener.getMvStorage();
if (storageIdx == 0) {
storageIdx = storage.lastAppliedIndex();
} else if (storageIdx != storage.lastAppliedIndex()) {
return false;
}
}
return true;
}
protected void injectFailureOnNextOperation(TableViewInternal accounts) {
InternalTable internalTable = accounts.internalTable();
ReplicaService replicaService = IgniteTestUtils.getFieldValue(internalTable, "replicaSvc");
Mockito.doReturn(CompletableFuture.failedFuture(new Exception())).when(replicaService).invoke((String) any(), any());
Mockito.doReturn(CompletableFuture.failedFuture(new Exception())).when(replicaService).invoke((ClusterNode) any(), any());
}
protected Collection<TxManager> txManagers() {
return txTestCluster.txManagers().values();
}
@Test
public void testCommitRollbackSameTxDoesNotThrow() throws TransactionException {
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
accounts.recordView().upsert(tx, makeValue(1, 100.));
tx.commit();
assertDoesNotThrow(tx::rollback, "Unexpected exception was thrown.");
assertDoesNotThrow(tx::commit, "Unexpected exception was thrown.");
assertDoesNotThrow(tx::rollback, "Unexpected exception was thrown.");
}
@Test
public void testRollbackCommitSameTxDoesNotThrow() throws TransactionException {
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
accounts.recordView().upsert(tx, makeValue(1, 100.));
tx.rollback();
assertDoesNotThrow(tx::commit, "Unexpected exception was thrown.");
assertDoesNotThrow(tx::rollback, "Unexpected exception was thrown.");
assertDoesNotThrow(tx::commit, "Unexpected exception was thrown.");
}
@Test
public void testRepeatedCommitRollbackAfterUpdateWithException() throws Exception {
injectFailureOnNextOperation(accounts);
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
CompletableFuture<Void> fut = accounts.recordView().upsertAsync(tx, makeValue(1, 100.));
assertThrows(Exception.class, fut::join);
tx.commitAsync().join();
tx.rollbackAsync().join();
tx.commitAsync().join();
}
@Test
public void testRepeatedCommitRollbackAfterRollbackWithException() throws Exception {
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
accounts.recordView().upsert(tx, makeValue(1, 100.));
injectFailureOnNextOperation(accounts);
CompletableFuture<Void> fut = tx.rollbackAsync();
assertThrows(Exception.class, fut::join);
tx.commitAsync().join();
tx.rollbackAsync().join();
tx.commitAsync().join();
}
@Test
public void testDeleteUpsertCommit() throws TransactionException {
deleteUpsert().commit();
assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
}
@Test
public void testDeleteUpsertRollback() throws TransactionException {
deleteUpsert().rollback();
assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
}
protected InternalTransaction deleteUpsert() {
accounts.recordView().upsert(null, makeValue(1, 100.));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
accounts.recordView().delete(tx, makeKey(1));
assertNull(accounts.recordView().get(tx, makeKey(1)));
accounts.recordView().upsert(tx, makeValue(1, 200.));
return tx;
}
@Test
public void testDeleteUpsertAllCommit() throws TransactionException {
deleteUpsertAll().commit();
assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertEquals(200., accounts.recordView().get(null, makeKey(2)).doubleValue("balance"));
}
@Test
public void testDeleteUpsertAllRollback() throws TransactionException {
deleteUpsertAll().rollback();
var res1 = accounts.recordView().get(null, makeKey(1));
assertEquals(100., res1.doubleValue("balance"), "tuple =[" + res1 + "]");
var res2 = accounts.recordView().get(null, makeKey(2));
assertEquals(100., res2.doubleValue("balance"), "tuple =[" + res2 + "]");
}
private InternalTransaction deleteUpsertAll() {
List<Tuple> tuples = new ArrayList<>();
tuples.add(makeValue(1, 100.));
tuples.add(makeValue(2, 100.));
accounts.recordView().upsertAll(null, tuples);
var res1 = accounts.recordView().get(null, makeKey(1));
assertEquals(100., res1.doubleValue("balance"), "tuple =[" + res1 + "]");
var res2 = accounts.recordView().get(null, makeKey(2));
assertEquals(100., res2.doubleValue("balance"), "tuple =[" + res2 + "]");
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
tuples.clear();
tuples.add(makeKey(1));
tuples.add(makeKey(2));
accounts.recordView().deleteAll(tx, tuples);
tuples.clear();
tuples.add(makeValue(1, 200.));
tuples.add(makeValue(2, 200.));
accounts.recordView().upsertAll(tx, tuples);
return tx;
}
@Test
public void testMixedPutGet() throws TransactionException {
accounts.recordView().upsert(null, makeValue(1, BALANCE_1));
igniteTransactions.runInTransaction(
tx -> {
var txAcc = accounts.recordView();
txAcc.getAsync(tx, makeKey(1)).thenCompose(r ->
txAcc.upsertAsync(tx, makeValue(1, r.doubleValue("balance") + DELTA))).join();
}
);
assertEquals(BALANCE_1 + DELTA, accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
}
@Test
public void testLockOrdering() throws InterruptedException {
accounts.recordView().upsert(null, makeValue(1, 50.));
InternalTransaction tx1 = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx3 = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx4 = (InternalTransaction) igniteTransactions.begin();
assertTrue(tx3.id().compareTo(tx4.id()) < 0);
assertTrue(tx2.id().compareTo(tx3.id()) < 0);
assertTrue(tx1.id().compareTo(tx2.id()) < 0);
RecordView<Tuple> acc0 = accounts.recordView();
RecordView<Tuple> acc2 = accounts.recordView();
RecordView<Tuple> acc3 = accounts.recordView();
RecordView<Tuple> acc4 = accounts.recordView();
acc0.upsert(tx4, makeValue(1, 100.));
CompletableFuture<Void> fut = acc3.upsertAsync(tx2, makeValue(1, 300.));
Thread.sleep(100);
assertFalse(fut.isDone());
CompletableFuture<Void> fut2 = acc4.upsertAsync(tx2, makeValue(1, 400.));
Thread.sleep(100);
assertFalse(fut2.isDone());
CompletableFuture<Void> fut3 = acc2.upsertAsync(tx3, makeValue(1, 200.));
assertFalse(fut3.isDone());
}
/**
* Tests a transaction closure.
*/
@Test
public void testTxClosure() throws TransactionException {
RecordView<Tuple> view = accounts.recordView();
view.upsert(null, makeValue(1, BALANCE_1));
view.upsert(null, makeValue(2, BALANCE_2));
igniteTransactions.runInTransaction(tx -> {
CompletableFuture<Tuple> read1 = view.getAsync(tx, makeKey(1));
CompletableFuture<Tuple> read2 = view.getAsync(tx, makeKey(2));
// TODO asch IGNITE-15938 must ensure a commit happens after all pending tx async ops.
view.upsertAsync(tx, makeValue(1, read1.join().doubleValue("balance") - DELTA)).join();
view.upsertAsync(tx, makeValue(2, read2.join().doubleValue("balance") + DELTA)).join();
});
assertEquals(BALANCE_1 - DELTA, view.get(null, makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, view.get(null, makeKey(2)).doubleValue("balance"));
assertEquals(5, clientTxManager().finished());
assertEquals(0, clientTxManager().pending());
}
/**
* Tests a transaction closure over key-value view.
*/
@Test
public void testTxClosureKeyValueView() throws TransactionException {
accounts.recordView().upsert(null, makeValue(1, BALANCE_1));
accounts.recordView().upsert(null, makeValue(2, BALANCE_2));
igniteTransactions.runInTransaction(tx -> {
KeyValueView<Tuple, Tuple> view = accounts.keyValueView();
CompletableFuture<Tuple> read1 = view.getAsync(tx, makeKey(1));
CompletableFuture<Tuple> read2 = view.getAsync(tx, makeKey(2));
view.putAsync(tx, makeKey(1), makeValue(read1.join().doubleValue("balance") - DELTA)).join();
view.putAsync(tx, makeKey(2), makeValue(read2.join().doubleValue("balance") + DELTA)).join();
});
assertEquals(BALANCE_1 - DELTA, accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, accounts.recordView().get(null, makeKey(2)).doubleValue("balance"));
assertEquals(5, clientTxManager().finished());
assertEquals(0, clientTxManager().pending());
}
/**
* Tests positive transfer scenario.
*/
@Test
public void testTxClosureAsync() {
double balance1 = 200.;
double balance2 = 300.;
double delta = 50.;
Tuple ret = transferAsync(balance1, balance2, delta).join();
RecordView<Tuple> view = accounts.recordView();
assertEquals(balance1 - delta, view.get(null, makeKey(1)).doubleValue("balance"));
assertEquals(balance2 + delta, view.get(null, makeKey(2)).doubleValue("balance"));
assertEquals(balance1, ret.doubleValue("balance1"));
assertEquals(balance2, ret.doubleValue("balance2"));
}
/**
* Tests negative transfer scenario.
*/
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-17861")
public void testTxClosureAbortAsync() {
double balance1 = 10.;
double balance2 = 300.;
double delta = 50.;
assertThrows(CompletionException.class, () -> transferAsync(balance1, balance2, delta).join());
RecordView<Tuple> view = accounts.recordView();
assertEquals(balance1, view.get(null, makeKey(1)).doubleValue("balance"));
assertEquals(balance2, view.get(null, makeKey(2)).doubleValue("balance"));
}
/**
* Tests uncaught exception in the closure.
*/
@Test
public void testTxClosureUncaughtExceptionAsync() {
double balance = 10.;
double delta = 50.;
RecordView<Tuple> view = accounts.recordView();
view.upsert(null, makeValue(1, balance));
CompletableFuture<Double> fut0 = igniteTransactions.runInTransactionAsync(tx -> {
CompletableFuture<Double> fut = view.getAsync(tx, makeKey(1))
.thenCompose(val2 -> {
double prev = val2.doubleValue("balance");
return view.upsertAsync(tx, makeValue(1, delta + 20)).thenApply(ignored -> prev);
});
fut.join();
if (true) {
throw new IllegalArgumentException();
}
return fut;
});
var err = assertThrows(CompletionException.class, fut0::join);
try {
assertInstanceOf(IllegalArgumentException.class, err.getCause());
} catch (AssertionError e) {
throw new AssertionError("Unexpected exception type", err);
}
assertEquals(balance, view.get(null, makeKey(1)).doubleValue("balance"));
}
/**
* Tests uncaught exception in the chain.
*/
@Test
public void testTxClosureUncaughtExceptionInChainAsync() {
RecordView<Tuple> view = accounts.recordView();
CompletableFuture<Double> fut0 = igniteTransactions.runInTransactionAsync(tx -> {
return view.getAsync(tx, makeKey(2))
.thenCompose(val2 -> {
double prev = val2.doubleValue("balance"); // val2 is null - NPE is thrown here
return view.upsertAsync(tx, makeValue(1, 100)).thenApply(ignored -> prev);
});
});
var err = assertThrows(CompletionException.class, fut0::join);
try {
assertInstanceOf(NullPointerException.class, err.getCause());
} catch (AssertionError e) {
throw new AssertionError("Unexpected exception type", err);
}
}
@Test
public void testBatchPutConcurrently() {
Transaction tx1 = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
log.info("Tx " + tx2);
log.info("Tx2 " + tx1);
ArrayList<Tuple> rows = new ArrayList<>();
ArrayList<Tuple> rows2 = new ArrayList<>();
for (int i = 0; i < 1; i++) {
rows.add(makeValue(i, i * 100.));
rows2.add(makeValue(i, 2 * i * 100.));
}
var table = accounts.recordView();
var table2 = accounts.recordView();
table2.upsertAll(tx1, rows2);
Exception err = assertThrows(Exception.class, () -> table.upsertAll(tx2, rows));
assertTrue(err.getMessage().contains("Failed to acquire a lock"), err.getMessage());
tx1.commit();
}
@Test
public void testBatchReadPutConcurrently() throws InterruptedException {
InternalTransaction tx1 = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
log.info("Tx1 " + tx1);
log.info("Tx2 " + tx2);
var table = accounts.recordView();
var table2 = accounts.recordView();
ArrayList<Tuple> keys = new ArrayList<>();
ArrayList<Tuple> keys2 = new ArrayList<>();
for (int i = 0; i < 1; i++) {
keys.add(makeKey(i));
keys2.add(makeKey(i));
}
table2.getAll(tx1, keys);
table2.getAll(tx2, keys2);
ArrayList<Tuple> rows = new ArrayList<>();
ArrayList<Tuple> rows2 = new ArrayList<>();
for (int i = 0; i < 1; i++) {
rows.add(makeValue(i, i * 100.));
rows2.add(makeValue(i, 2 * i * 100.));
}
var futUpd2 = table2.upsertAllAsync(tx1, rows2);
assertTrue(IgniteTestUtils.waitForCondition(() -> {
boolean lockUpgraded = false;
for (Iterator<Lock> it = txManager(accounts).lockManager().locks(tx1.id()); it.hasNext(); ) {
Lock lock = it.next();
lockUpgraded = txManager(accounts).lockManager().waiter(lock.lockKey(), tx1.id()).intendedLockMode() == LockMode.X;
if (lockUpgraded) {
break;
}
}
return lockUpgraded;
}, 3000));
assertFalse(futUpd2.isDone());
assertThrowsWithCause(() -> table.upsertAll(tx2, rows), LockException.class);
}
/**
* Tests an asynchronous transaction.
*/
@Test
public void testTxAsync() {
accounts.recordView().upsert(null, makeValue(1, BALANCE_1));
accounts.recordView().upsert(null, makeValue(2, BALANCE_2));
igniteTransactions.beginAsync()
.thenCompose(tx -> accounts.recordView().getAsync(tx, makeKey(1))
.thenCombine(accounts.recordView().getAsync(tx, makeKey(2)), (v1, v2) -> new Pair<>(v1, v2))
.thenCompose(pair -> allOf(
accounts.recordView().upsertAsync(
tx, makeValue(1, pair.getFirst().doubleValue("balance") - DELTA)),
accounts.recordView().upsertAsync(
tx, makeValue(2, pair.getSecond().doubleValue("balance") + DELTA))
)
.thenApply(ignored -> tx)
)
).thenCompose(Transaction::commitAsync).join();
assertEquals(BALANCE_1 - DELTA, accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, accounts.recordView().get(null, makeKey(2)).doubleValue("balance"));
}
/**
* Tests an asynchronous transaction over key-value view.
*/
@Test
public void testTxAsyncKeyValueView() {
accounts.recordView().upsert(null, makeValue(1, BALANCE_1));
accounts.recordView().upsert(null, makeValue(2, BALANCE_2));
igniteTransactions.beginAsync()
.thenCompose(tx -> accounts.keyValueView().getAsync(tx, makeKey(1))
.thenCombine(accounts.recordView().getAsync(tx, makeKey(2)), (v1, v2) -> new Pair<>(v1, v2))
.thenCompose(pair -> allOf(
accounts.keyValueView().putAsync(
tx, makeKey(1), makeValue(pair.getFirst().doubleValue("balance") - DELTA)),
accounts.keyValueView().putAsync(
tx, makeKey(2), makeValue(pair.getSecond().doubleValue("balance") + DELTA))
)
.thenApply(ignored -> tx)
)
).thenCompose(Transaction::commitAsync).join();
assertEquals(BALANCE_1 - DELTA, accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertEquals(BALANCE_2 + DELTA, accounts.recordView().get(null, makeKey(2)).doubleValue("balance"));
}
@Test
public void testSimpleConflict() throws Exception {
accounts.recordView().upsert(null, makeValue(1, 100.));
Transaction tx1 = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
var table = accounts.recordView();
var table2 = accounts.recordView();
double val = table.get(tx2, makeKey(1)).doubleValue("balance");
table2.get(tx1, makeKey(1)).doubleValue("balance");
try {
table.upsert(tx2, makeValue(1, val + 1));
fail();
} catch (Exception e) {
// Expected.
}
table2.upsert(tx1, makeValue(1, val + 1));
tx1.commit();
tx2.commit();
assertEquals(101., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
}
@Test
public void testCommit() throws TransactionException {
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
Tuple key = makeKey(1);
var table = accounts.recordView();
table.upsert(tx, makeValue(1, 100.));
assertEquals(100., table.get(tx, key).doubleValue("balance"));
table.upsert(tx, makeValue(1, 200.));
assertEquals(200., table.get(tx, key).doubleValue("balance"));
tx.commit();
assertEquals(200., accounts.recordView().get(null, key).doubleValue("balance"));
}
@Test
public void testAbort() throws TransactionException {
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
Tuple key = makeKey(1);
var table = accounts.recordView();
table.upsert(tx, makeValue(1, 100.));
assertEquals(100., table.get(tx, key).doubleValue("balance"));
table.upsert(tx, makeValue(1, 200.));
assertEquals(200., table.get(tx, key).doubleValue("balance"));
tx.rollback();
assertNull(accounts.recordView().get(null, key));
}
@Test
public void testAbortNoUpdate() throws TransactionException {
accounts.recordView().upsert(null, makeValue(1, 100.));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
tx.rollback();
assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
}
@Test
public void testConcurrent() throws TransactionException {
Transaction tx = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
Tuple key = makeKey(1);
Tuple val = makeValue(1, 100.);
accounts.recordView().upsert(null, val);
var table = accounts.recordView();
var table2 = accounts.recordView();
assertEquals(100., table.get(tx, key).doubleValue("balance"));
assertEquals(100., table2.get(tx2, key).doubleValue("balance"));
tx.commit();
tx2.commit();
}
/**
* Tests if a lost update is not happening on concurrent increment.
*/
@Test
public void testIncrement() throws TransactionException {
Transaction tx1 = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
Tuple key = makeKey(1);
Tuple val = makeValue(1, 100.);
accounts.recordView().upsert(null, val); // Creates implicit transaction.
var table = accounts.recordView();
var table2 = accounts.recordView();
// Read in tx2
double valTx = table.get(tx2, key).doubleValue("balance");
// Read in tx1
double valTx2 = table2.get(tx1, key).doubleValue("balance");
// Write in tx2 (out of order)
// TODO asch IGNITE-15937 fix exception model.
Exception err = assertThrows(Exception.class, () -> table.upsert(tx2, makeValue(1, valTx + 1)));
assertTrue(err.getMessage().contains("Failed to acquire a lock"), err.getMessage());
// Write in tx1
table2.upsert(tx1, makeValue(1, valTx2 + 1));
tx1.commit();
assertEquals(101., accounts.recordView().get(null, key).doubleValue("balance"));
}
@Test
public void testAbortWithValue() throws TransactionException {
accounts.recordView().upsert(null, makeValue(0, 100.));
assertEquals(100., accounts.recordView().get(null, makeKey(0)).doubleValue("balance"));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
var table = accounts.recordView();
table.upsert(tx, makeValue(0, 200.));
assertEquals(200., table.get(tx, makeKey(0)).doubleValue("balance"));
tx.rollback();
assertEquals(100., accounts.recordView().get(null, makeKey(0)).doubleValue("balance"));
}
@Test
public void testInsert() throws TransactionException {
assertNull(accounts.recordView().get(null, makeKey(1)));
Transaction tx = igniteTransactions.begin();
var table = accounts.recordView();
assertTrue(table.insert(tx, makeValue(1, 100.)));
assertFalse(table.insert(tx, makeValue(1, 200.)));
assertEquals(100., table.get(tx, makeKey(1)).doubleValue("balance"));
tx.commit();
assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertTrue(accounts.recordView().insert(null, makeValue(2, 200.)));
assertEquals(200., accounts.recordView().get(null, makeKey(2)).doubleValue("balance"));
Transaction tx2 = igniteTransactions.begin();
table = accounts.recordView();
assertTrue(table.insert(tx2, makeValue(3, 100.)));
assertFalse(table.insert(tx2, makeValue(3, 200.)));
assertEquals(100., table.get(tx2, makeKey(3)).doubleValue("balance"));
tx2.rollback();
assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertEquals(200., accounts.recordView().get(null, makeKey(2)).doubleValue("balance"));
assertNull(accounts.recordView().get(null, makeKey(3)));
}
@Test
public void testDelete() throws TransactionException {
Tuple key = makeKey(1);
assertFalse(accounts.recordView().delete(null, key));
assertNull(accounts.recordView().get(null, key));
accounts.recordView().upsert(null, makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertNotNull(accounts.recordView().get(tx, key));
assertTrue(accounts.recordView().delete(tx, key));
assertNull(accounts.recordView().get(tx, key));
});
assertNull(accounts.recordView().get(null, key));
accounts.recordView().upsert(null, makeValue(1, 100.));
assertNotNull(accounts.recordView().get(null, key));
Tuple key2 = makeKey(2);
accounts.recordView().upsert(null, makeValue(2, 100.));
assertThrows(RuntimeException.class, () -> igniteTransactions.runInTransaction((Consumer<Transaction>) tx -> {
assertNotNull(accounts.recordView().get(tx, key2));
assertTrue(accounts.recordView().delete(tx, key2));
assertNull(accounts.recordView().get(tx, key2));
throw new RuntimeException(); // Triggers rollback.
}));
assertNotNull(accounts.recordView().get(null, key2));
assertTrue(accounts.recordView().delete(null, key2));
assertNull(accounts.recordView().get(null, key2));
}
@Test
public void testGetAll() {
List<Tuple> keys = List.of(makeKey(1), makeKey(2));
Collection<Tuple> ret = accounts.recordView().getAll(null, keys);
assertThat(ret, contains(null, null));
accounts.recordView().upsert(null, makeValue(1, 100.));
accounts.recordView().upsert(null, makeValue(2, 200.));
ret = new ArrayList<>(accounts.recordView().getAll(null, keys));
validateBalance(ret, 100., 200.);
}
/**
* Tests if a transaction is rolled back if one of the batch keys can't be locked.
*/
@Test
public void testGetAllAbort() throws TransactionException {
List<Tuple> keys = List.of(makeKey(1), makeKey(2));
accounts.recordView().upsertAll(null, List.of(makeValue(1, 100.), makeValue(2, 200.)));
Transaction tx = igniteTransactions.begin();
RecordView<Tuple> txAcc = accounts.recordView();
txAcc.upsert(tx, makeValue(1, 300.));
validateBalance(txAcc.getAll(tx, keys), 300., 200.);
tx.rollback();
validateBalance(accounts.recordView().getAll(null, keys), 100., 200.);
}
/**
* Tests if a transaction is rolled back if one of the batch keys can't be locked.
*/
@Test
public void testGetAllConflict() throws Exception {
accounts.recordView().upsertAll(null, List.of(makeValue(1, 100.), makeValue(2, 200.)));
InternalTransaction tx1 = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
RecordView<Tuple> txAcc = accounts.recordView();
RecordView<Tuple> txAcc2 = accounts.recordView();
txAcc2.upsert(tx1, makeValue(1, 300.));
txAcc.upsert(tx2, makeValue(2, 400.));
Exception err = assertThrows(Exception.class, () -> txAcc.getAll(tx2, List.of(makeKey(2), makeKey(1))));
assertTrue(err.getMessage().contains("Failed to acquire a lock"), err.getMessage());
validateBalance(txAcc2.getAll(tx1, List.of(makeKey(2), makeKey(1))), 200., 300.);
validateBalance(txAcc2.getAll(tx1, List.of(makeKey(1), makeKey(2))), 300., 200.);
assertTrue(IgniteTestUtils.waitForCondition(() -> TxState.ABORTED == tx2.state(), 5_000), tx2.state().toString());
tx1.commit();
validateBalance(accounts.recordView().getAll(null, List.of(makeKey(2), makeKey(1))), 200., 300.);
}
@Test
public void testPutAll() throws TransactionException {
igniteTransactions.runInTransaction(tx -> {
accounts.recordView().upsertAll(tx, List.of(makeValue(1, 100.), makeValue(2, 200.)));
});
validateBalance(accounts.recordView().getAll(null, List.of(makeKey(2), makeKey(1))), 200., 100.);
assertThrows(IgniteException.class, () -> igniteTransactions.runInTransaction(tx -> {
accounts.recordView().upsertAll(tx, List.of(makeValue(3, 300.), makeValue(4, 400.)));
if (true) {
throw new IgniteException();
}
}));
assertNull(accounts.recordView().get(null, makeKey(3)));
assertNull(accounts.recordView().get(null, makeKey(4)));
}
@Test
public void testInsertAll() throws TransactionException {
accounts.recordView().upsertAll(null, List.of(makeValue(1, 100.), makeValue(2, 200.)));
igniteTransactions.runInTransaction(
tx -> {
Collection<Tuple> res = accounts.recordView().insertAll(
tx,
List.of(makeValue(1, 200.), makeValue(3, 300.))
);
assertEquals(1, res.size());
});
validateBalance(
accounts.recordView().getAll(
null,
List.of(makeKey(1), makeKey(2), makeKey(3))
),
100., 200., 300.
);
}
@Test
public void testDeleteAll() throws TransactionException {
accounts.recordView().upsertAll(null, List.of(makeValue(1, 100.), makeValue(2, 200.)));
igniteTransactions.runInTransaction(tx -> {
Collection<Tuple> res = accounts.recordView().deleteAll(tx, List.of(makeKey(1), makeKey(2), makeKey(3)));
assertEquals(1, res.size());
});
assertNull(accounts.recordView().get(null, makeKey(1)));
assertNull(accounts.recordView().get(null, makeKey(2)));
}
@Test
public void testReplace() throws TransactionException {
accounts.recordView().upsert(null, makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertFalse(accounts.recordView().replace(tx, makeValue(2, 200.)));
assertTrue(accounts.recordView().replace(tx, makeValue(1, 200.)));
});
validateBalance(accounts.recordView().getAll(null, List.of(makeKey(1))), 200.);
}
@Test
public void testGetAndReplace() throws TransactionException {
accounts.recordView().upsert(null, makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertNull(accounts.recordView().getAndReplace(tx, makeValue(2, 200.)));
assertNotNull(accounts.recordView().getAndReplace(tx, makeValue(1, 200.)));
});
validateBalance(accounts.recordView().getAll(null, List.of(makeKey(1))), 200.);
}
@Test
public void testDeleteExact() throws TransactionException {
accounts.recordView().upsert(null, makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertFalse(accounts.recordView().deleteExact(tx, makeValue(1, 200.)));
assertTrue(accounts.recordView().deleteExact(tx, makeValue(1, 100.)));
});
Tuple actual = accounts.recordView().get(null, makeKey(1));
assertNull(actual);
}
@Test
public void testDeleteAllExact() throws TransactionException {
accounts.recordView().upsertAll(null, List.of(makeValue(1, 100.), makeValue(2, 200.)));
igniteTransactions.runInTransaction(
tx -> {
Collection<Tuple> res = accounts.recordView().deleteAllExact(
tx,
List.of(makeValue(1, 200.), makeValue(2, 200.), makeValue(3, 300.))
);
assertEquals(2, res.size());
});
assertNotNull(accounts.recordView().get(null, makeKey(1)));
assertNull(accounts.recordView().get(null, makeKey(2)));
}
@Test
public void testGetAndPut() throws TransactionException {
accounts.recordView().upsert(null, makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertNotNull(accounts.recordView().getAndUpsert(tx, makeValue(1, 200.)));
assertNull(accounts.recordView().getAndUpsert(tx, makeValue(2, 200.)));
});
validateBalance(accounts.recordView().getAll(null, List.of(makeKey(1), makeKey(2))), 200., 200.);
}
@Test
public void testGetAndDelete() throws TransactionException {
accounts.recordView().upsert(null, makeValue(1, 100.));
igniteTransactions.runInTransaction(tx -> {
assertEquals(100., accounts.recordView().getAndDelete(tx, makeKey(1)).doubleValue("balance"));
});
assertNull(accounts.recordView().get(null, makeKey(1)));
}
@Test
public void testRollbackUpgradedLock() throws Exception {
accounts.recordView().upsert(null, makeValue(1, 100.));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
var table = accounts.recordView();
var table2 = accounts.recordView();
double v0 = table.get(tx, makeKey(1)).doubleValue("balance");
double v1 = table2.get(tx2, makeKey(1)).doubleValue("balance");
assertEquals(v0, v1);
// Try to upgrade a lock.
table2.upsertAsync(tx2, makeValue(1, v0 + 10));
Thread.sleep(300); // Give some time to update lock queue TODO asch IGNITE-15928
tx2.rollback();
}
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-15938") // TODO asch IGNITE-15938
public void testUpgradedLockInvalidation() throws Exception {
accounts.recordView().upsert(null, makeValue(1, 100.));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
var table = accounts.recordView();
var table2 = accounts.recordView();
double v0 = table.get(tx, makeKey(1)).doubleValue("balance");
double v1 = table2.get(tx2, makeKey(1)).doubleValue("balance");
assertEquals(v0, v1);
// Try to upgrade a lock.
table2.upsertAsync(tx2, makeValue(1, v0 + 10));
Thread.sleep(300); // Give some time to update lock queue TODO asch IGNITE-15928
table.upsert(tx, makeValue(1, v0 + 20));
tx.commit();
assertThrows(Exception.class, () -> tx2.commit());
}
@Test
public void testReorder() throws Exception {
accounts.recordView().upsert(null, makeValue(1, 100.));
InternalTransaction tx1 = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx3 = (InternalTransaction) igniteTransactions.begin();
var table = accounts.recordView();
var table2 = accounts.recordView();
var table3 = accounts.recordView();
double v0 = table.get(tx3, makeKey(1)).doubleValue("balance");
table.upsertAsync(tx3, makeValue(1, v0 + 10));
CompletableFuture<Tuple> fut = table2.getAsync(tx2, makeKey(1));
assertFalse(fut.isDone());
CompletableFuture<Tuple> fut2 = table3.getAsync(tx1, makeKey(1));
assertFalse(fut2.isDone());
}
@Test
public void testCrossTable() throws Exception {
customers.recordView().upsert(null, makeValue(1, "test"));
accounts.recordView().upsert(null, makeValue(1, 100.));
assertEquals("test", customers.recordView().get(null, makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
var txCust = customers.recordView();
var txAcc = accounts.recordView();
txCust.upsert(tx, makeValue(1, "test2"));
txAcc.upsert(tx, makeValue(1, 200.));
Tuple txValCust = txCust.get(tx, makeKey(1));
assertEquals("test2", txValCust.stringValue("name"));
txValCust.set("accountNumber", 2L);
txValCust.set("name", "test3");
Tuple txValAcc = txAcc.get(tx, makeKey(1));
assertEquals(200., txValAcc.doubleValue("balance"));
txValAcc.set("accountNumber", 2L);
txValAcc.set("balance", 300.);
tx.commit();
assertEquals("test2", customers.recordView().get(null, makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000));
assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(customers).isEmpty(), 10_000));
}
@Test
public void testTwoTables() throws Exception {
customers.recordView().upsert(null, makeValue(1, "test"));
accounts.recordView().upsert(null, makeValue(1, 100.));
assertEquals("test", customers.recordView().get(null, makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
var txCust = customers.recordView();
var txAcc = accounts.recordView();
txCust.upsert(tx, makeValue(1, "test2"));
txAcc.upsert(tx, makeValue(1, 200.));
Tuple txValCust = txCust.get(tx, makeKey(1));
assertEquals("test2", txValCust.stringValue("name"));
txValCust.set("accountNumber", 2L);
txValCust.set("name", "test3");
Tuple txValAcc = txAcc.get(tx, makeKey(1));
assertEquals(200., txValAcc.doubleValue("balance"));
txValAcc.set("accountNumber", 2L);
txValAcc.set("balance", 300.);
tx.commit();
tx2.commit();
assertEquals("test2", customers.recordView().get(null, makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000));
}
@Test
public void testCrossTableKeyValueView() throws Exception {
customers.recordView().upsert(null, makeValue(1L, "test"));
accounts.recordView().upsert(null, makeValue(1L, 100.));
assertEquals("test", customers.recordView().get(null, makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
InternalTransaction tx = (InternalTransaction) igniteTransactions.begin();
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
var txCust = customers.keyValueView();
var txAcc = accounts.keyValueView();
txCust.put(tx, makeKey(1), makeValue("test2"));
txAcc.put(tx, makeKey(1), makeValue(200.));
Tuple txValCust = txCust.get(tx, makeKey(1));
assertEquals("test2", txValCust.stringValue("name"));
txValCust.set("accountNumber", 2L);
txValCust.set("name", "test3");
Tuple txValAcc = txAcc.get(tx, makeKey(1));
assertEquals(200., txValAcc.doubleValue("balance"));
txValAcc.set("accountNumber", 2L);
txValAcc.set("balance", 300.);
tx.commit();
tx2.commit();
assertEquals("test2", customers.recordView().get(null, makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000));
}
@Test
public void testCrossTableAsync() throws Exception {
customers.recordView().upsert(null, makeValue(1, "test"));
accounts.recordView().upsert(null, makeValue(1, 100.));
igniteTransactions.beginAsync()
.thenCompose(
tx -> accounts.recordView().upsertAsync(tx, makeValue(1, 200.))
.thenCombine(customers.recordView().upsertAsync(tx, makeValue(1, "test2")), (v1, v2) -> tx)
)
.thenCompose(Transaction::commitAsync).join();
assertEquals("test2", customers.recordView().get(null, makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000));
}
@Test
public void testCrossTableAsyncRollback() throws Exception {
customers.recordView().upsert(null, makeValue(1, "test"));
accounts.recordView().upsert(null, makeValue(1, 100.));
igniteTransactions.beginAsync()
.thenCompose(
tx -> accounts.recordView().upsertAsync(tx, makeValue(1, 200.))
.thenCombine(customers.recordView().upsertAsync(tx, makeValue(1, "test2")), (v1, v2) -> tx)
)
.thenCompose(Transaction::rollbackAsync).join();
assertEquals("test", customers.recordView().get(null, makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000));
}
@Test
public void testCrossTableAsyncKeyValueView() throws Exception {
customers.recordView().upsert(null, makeValue(1, "test"));
accounts.recordView().upsert(null, makeValue(1, 100.));
igniteTransactions.beginAsync()
.thenCompose(
tx -> accounts.keyValueView().putAsync(tx, makeKey(1), makeValue(200.))
.thenCombine(customers.keyValueView().putAsync(tx, makeKey(1), makeValue("test2")),
(v1, v2) -> tx)
)
.thenCompose(Transaction::commitAsync).join();
assertEquals("test2", customers.recordView().get(null, makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000));
}
@Test
public void testCrossTableAsyncKeyValueViewRollback() throws Exception {
customers.recordView().upsert(null, makeValue(1, "test"));
accounts.recordView().upsert(null, makeValue(1, 100.));
igniteTransactions.beginAsync()
.thenCompose(
tx -> accounts.keyValueView().putAsync(tx, makeKey(1), makeValue(200.))
.thenCombine(customers.keyValueView().putAsync(tx, makeKey(1), makeValue("test2")),
(v1, v2) -> tx)
)
.thenCompose(Transaction::rollbackAsync).join();
assertEquals("test", customers.recordView().get(null, makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
assertTrue(IgniteTestUtils.waitForCondition(() -> lockManager(accounts).isEmpty(), 10_000));
}
@Test
public void testBalance() throws InterruptedException {
doTestSingleKeyMultithreaded(5_000, false);
}
@Test
public void testLockedTooLong() {
// TODO asch IGNITE-15936 if lock can't be acquired until timeout tx should be rolled back.
}
@Test
public void testScan() throws Exception {
doTestScan(null);
}
@Test
public void testScanExplicit() throws Exception {
igniteTransactions.runInTransaction(this::doTestScan);
}
/**
* Do scan in test.
*
* @param tx The transaction.
*/
private void doTestScan(@Nullable Transaction tx) {
accounts.recordView().upsertAll(tx, List.of(makeValue(1, 100.), makeValue(2, 200.)));
CompletableFuture<List<Tuple>> scanFut = scan(accounts.internalTable(), tx == null ? null : (InternalTransaction) tx);
var rows = scanFut.join();
Map<Long, Tuple> map = new HashMap<>();
for (Tuple row : rows) {
map.put(row.longValue("accountNumber"), row);
}
assertEquals(100., map.get(1L).doubleValue("balance"));
assertEquals(200., map.get(2L).doubleValue("balance"));
// Attempt to overwrite.
accounts.recordView().upsertAll(tx, List.of(makeValue(1, 300.), makeValue(2, 400.)));
}
/**
* Scans {@code 0} partition of a table in a specific transaction or implicit one.
*
* @param internalTable Internal table to scanning.
* @param internalTx Internal transaction of {@code null}.
* @return Future to scanning result.
*/
private CompletableFuture<List<Tuple>> scan(InternalTable internalTable, @Nullable InternalTransaction internalTx) {
Flow.Publisher<BinaryRow> pub = internalTx != null && internalTx.isReadOnly()
?
internalTable.scan(
0,
internalTx.id(),
internalTx.readTimestamp(),
internalTable.tableRaftService().leaderAssignment(0),
internalTx.coordinatorId()
)
: internalTable.scan(0, internalTx);
List<Tuple> rows = new ArrayList<>();
var fut = new CompletableFuture<List<Tuple>>();
pub.subscribe(new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscription.request(3);
}
@Override
public void onNext(BinaryRow item) {
SchemaRegistry registry = accounts.schemaView();
Row row = registry.resolve(item, registry.lastKnownSchema());
rows.add(TableRow.tuple(row));
}
@Override
public void onError(Throwable throwable) {
// No-op.
}
@Override
public void onComplete() {
fut.complete(rows);
}
});
return fut;
}
@Test
public void testComplexImplicit() {
doTestComplex(accounts.recordView(), null);
}
@Test
public void testComplexExplicit() throws TransactionException {
doTestComplex(accounts.recordView(), igniteTransactions.begin());
}
@Test
public void testComplexImplicitKeyValueView() {
doTestComplexKeyValue(accounts.keyValueView(), null);
}
@Test
public void testComplexExplicitKeyValueView() throws TransactionException {
doTestComplexKeyValue(accounts.keyValueView(), igniteTransactions.begin());
}
/**
* Checks operation over tuple record view. The scenario was moved from ITDistributedTableTest.
*
* @param view Record view.
* @param tx Transaction or {@code null} for implicit one.
*/
private void doTestComplex(RecordView<Tuple> view, @Nullable Transaction tx) {
final int keysCnt = 10;
long start = System.nanoTime();
for (long i = 0; i < keysCnt; i++) {
view.insert(tx, makeValue(i, i + 2.));
}
long dur = (long) ((System.nanoTime() - start) / 1000 / 1000.);
log.info("Inserted={}, time={}ms avg={} tps={}", keysCnt, dur, dur / keysCnt, 1000 / (dur / (float) keysCnt));
for (long i = 0; i < keysCnt; i++) {
Tuple entry = view.get(tx, makeKey(i));
assertEquals(i + 2., entry.doubleValue("balance"));
}
for (int i = 0; i < keysCnt; i++) {
view.upsert(tx, makeValue(i, i + 5.));
Tuple entry = view.get(tx, makeKey(i));
assertEquals(i + 5., entry.doubleValue("balance"));
}
HashSet<Tuple> keys = new HashSet<>();
for (long i = 0; i < keysCnt; i++) {
keys.add(makeKey(i));
}
Collection<Tuple> entries = view.getAll(tx, keys);
assertEquals(keysCnt, entries.size());
for (long i = 0; i < keysCnt; i++) {
boolean res = view.replace(tx, makeValue(i, i + 5.), makeValue(i, i + 2.));
assertTrue(res, "Failed to replace for idx=" + i);
}
for (long i = 0; i < keysCnt; i++) {
boolean res = view.delete(tx, makeKey(i));
assertTrue(res);
Tuple entry = view.get(tx, makeKey(i));
assertNull(entry);
}
ArrayList<Tuple> batch = new ArrayList<>(keysCnt);
for (long i = 0; i < keysCnt; i++) {
batch.add(makeValue(i, i + 2.));
}
view.upsertAll(tx, batch);
for (long i = 0; i < keysCnt; i++) {
Tuple entry = view.get(tx, makeKey(i));
assertEquals(i + 2., entry.doubleValue("balance"));
}
view.deleteAll(tx, keys);
for (Tuple key : keys) {
Tuple entry = view.get(tx, key);
assertNull(entry);
}
if (tx != null) {
tx.commit();
}
}
/**
* Checks operation over tuple key value view. The scenario was moved from ITDistributedTableTest.
*
* @param view Table view.
* @param tx Transaction or {@code null} for implicit one.
*/
public void doTestComplexKeyValue(KeyValueView<Tuple, Tuple> view, @Nullable Transaction tx) {
final int keysCnt = 10;
for (long i = 0; i < keysCnt; i++) {
view.put(tx, makeKey(i), makeValue(i + 2.));
}
for (long i = 0; i < keysCnt; i++) {
Tuple entry = view.get(tx, makeKey(i));
assertEquals(i + 2., entry.doubleValue("balance"));
}
for (int i = 0; i < keysCnt; i++) {
view.put(tx, makeKey(i), makeValue(i + 5.));
Tuple entry = view.get(tx, makeKey(i));
assertEquals(i + 5., entry.doubleValue("balance"));
}
HashSet<Tuple> keys = new HashSet<>();
for (long i = 0; i < keysCnt; i++) {
keys.add(makeKey(i));
}
Map<Tuple, Tuple> entries = view.getAll(tx, keys);
assertEquals(keysCnt, entries.size());
for (long i = 0; i < keysCnt; i++) {
boolean res = view.replace(tx, makeKey(i), makeValue(i + 5.), makeValue(i + 2.));
assertTrue(res, "Failed to replace for idx=" + i);
}
for (long i = 0; i < keysCnt; i++) {
boolean res = view.remove(tx, makeKey(i));
assertTrue(res);
Tuple entry = view.get(tx, makeKey(i));
assertNull(entry);
}
Map<Tuple, Tuple> batch = new LinkedHashMap<>(keysCnt);
for (long i = 0; i < keysCnt; i++) {
batch.put(makeKey(i), makeValue(i + 2.));
}
view.putAll(tx, batch);
for (long i = 0; i < keysCnt; i++) {
Tuple entry = view.get(tx, makeKey(i));
assertEquals(i + 2., entry.doubleValue("balance"));
}
view.removeAll(tx, keys);
for (Tuple key : keys) {
Tuple entry = view.get(tx, key);
assertNull(entry);
}
if (tx != null) {
tx.commit();
}
}
/**
* Performs a test.
*
* @param duration The duration.
* @param verbose Verbose mode.
* @throws InterruptedException If interrupted while waiting.
*/
private void doTestSingleKeyMultithreaded(long duration, boolean verbose) throws InterruptedException {
int threadsCnt = Runtime.getRuntime().availableProcessors() * 2;
Thread[] threads = new Thread[threadsCnt];
final int accountsCount = threads.length * 10;
final double initial = 1000;
final double total = accountsCount * initial;
for (int i = 0; i < accountsCount; i++) {
accounts.recordView().upsert(null, makeValue(i, 1000));
}
double total0 = 0;
for (long i = 0; i < accountsCount; i++) {
double balance = accounts.recordView().get(null, makeKey(i)).doubleValue("balance");
total0 += balance;
}
assertEquals(total, total0, "Total amount invariant is not preserved");
CyclicBarrier startBar = new CyclicBarrier(threads.length, () -> log.info("Before test"));
LongAdder ops = new LongAdder();
LongAdder fails = new LongAdder();
AtomicBoolean stop = new AtomicBoolean();
Random r = new Random();
AtomicReference<Throwable> firstErr = new AtomicReference<>();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
try {
startBar.await();
} catch (Exception e) {
fail();
}
while (!stop.get() && firstErr.get() == null) {
InternalTransaction tx = clientTxManager().begin(timestampTracker);
var table = accounts.recordView();
try {
long acc1 = r.nextInt(accountsCount);
double amount = 100 + r.nextInt(500);
if (verbose) {
log.info("op=tryGet ts={} id={}", tx.id(), acc1);
}
double val0 = table.get(tx, makeKey(acc1)).doubleValue("balance");
long acc2 = acc1;
while (acc1 == acc2) {
acc2 = r.nextInt(accountsCount);
}
if (verbose) {
log.info("op=tryGet ts={} id={}", tx.id(), acc2);
}
double val1 = table.get(tx, makeKey(acc2)).doubleValue("balance");
if (verbose) {
log.info("op=tryPut ts={} id={}", tx.id(), acc1);
}
table.upsert(tx, makeValue(acc1, val0 - amount));
if (verbose) {
log.info("op=tryPut ts={} id={}", tx.id(), acc2);
}
table.upsert(tx, makeValue(acc2, val1 + amount));
tx.commit();
ops.increment();
} catch (Exception e) {
assertTrue(e.getMessage().contains("Failed to acquire a lock"), e.getMessage());
tx.rollback();
fails.increment();
}
}
}
});
threads[i].setName("Worker-" + i);
threads[i].setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
firstErr.compareAndExchange(null, e);
}
});
threads[i].start();
}
Thread.sleep(duration);
stop.set(true);
for (Thread thread : threads) {
thread.join(3_000);
}
if (firstErr.get() != null) {
throw new IgniteException(firstErr.get());
}
log.info("After test ops={} fails={}", ops.sum(), fails.sum());
total0 = 0;
for (long i = 0; i < accountsCount; i++) {
double balance = accounts.recordView().get(null, makeKey(i)).doubleValue("balance");
total0 += balance;
}
assertEquals(total, total0, "Total amount invariant is not preserved");
}
/**
* Makes a key.
*
* @param id The id.
* @return The key tuple.
*/
protected Tuple makeKey(long id) {
return Tuple.create().set("accountNumber", id);
}
/**
* Makes a tuple containing key and value.
*
* @param id The id.
* @param balance The balance.
* @return The value tuple.
*/
protected Tuple makeValue(long id, double balance) {
return Tuple.create().set("accountNumber", id).set("balance", balance);
}
/**
* Makes a tuple containing key and value.
*
* @param id The id.
* @param name The name.
* @return The value tuple.
*/
private Tuple makeValue(long id, String name) {
return Tuple.create().set("accountNumber", id).set("name", name);
}
/**
* Makes a value.
*
* @param balance The balance.
* @return The value tuple.
*/
private Tuple makeValue(double balance) {
return Tuple.create().set("balance", balance);
}
/**
* Makes a value.
*
* @param name The name.
* @return The value tuple.
*/
private Tuple makeValue(String name) {
return Tuple.create().set("name", name);
}
/**
* Get a lock manager on a partition leader.
*
* @param t The table.
* @return Lock manager.
*/
protected LockManager lockManager(TableViewInternal t) {
return txManager(t).lockManager();
}
/**
* Validates balances.
*
* @param rows Rows.
* @param expected Expected values.
*/
protected static void validateBalance(Collection<Tuple> rows, @Nullable Double... expected) {
assertThat(
rows.stream().map(tuple -> tuple == null ? null : tuple.doubleValue("balance")).collect(toList()),
contains(expected)
);
}
/**
* Transfers money between accounts.
*
* @param balance1 First account initial balance.
* @param balance2 Second account initial balance.
* @param delta Delta.
* @return The future holding tuple with previous balances.
*/
private CompletableFuture<Tuple> transferAsync(double balance1, double balance2, double delta) {
RecordView<Tuple> view = accounts.recordView();
view.upsert(null, makeValue(1, balance1));
view.upsert(null, makeValue(2, balance2));
return igniteTransactions.runInTransactionAsync(tx -> {
// Attempt to withdraw from first account.
CompletableFuture<Double> fut1 = view.getAsync(tx, makeKey(1))
.thenCompose(val1 -> {
double prev = val1.doubleValue("balance");
double balance = prev - delta;
if (balance < 0) {
return tx.rollbackAsync().thenApply(ignored -> null);
}
return view.upsertAsync(tx, makeValue(1, balance)).thenApply(ignored -> prev);
});
// Optimistically deposit to second account.
CompletableFuture<Double> fut2 = view.getAsync(tx, makeKey(2))
.thenCompose(val2 -> {
double prev = val2.doubleValue("balance");
return view.upsertAsync(tx, makeValue(2, prev + delta)).thenApply(ignored -> prev);
});
return fut1.thenCompose(val1 -> fut2.thenCompose(val2 ->
completedFuture(Tuple.create().set("balance1", val1).set("balance2", val2))));
});
}
@Test
public void testReadOnlyGet() {
accounts.recordView().upsert(null, makeValue(1, 100.));
Transaction readOnlyTx = igniteTransactions.begin(new TransactionOptions().readOnly(true));
assertEquals(100., accounts.recordView().get(readOnlyTx, makeKey(1)).doubleValue("balance"));
}
@Test
public void testReadOnlyScan() throws Exception {
accounts.recordView().upsert(null, makeValue(1, 100.));
accounts.recordView().upsert(null, makeValue(2, 500.));
// Pending tx
Transaction tx = igniteTransactions.begin();
accounts.recordView().upsert(tx, makeValue(1, 300.));
accounts.recordView().delete(tx, makeKey(2));
InternalTransaction readOnlyTx = (InternalTransaction) igniteTransactions.begin(new TransactionOptions().readOnly(true));
CompletableFuture<List<Tuple>> roBeforeCommitTxFut = scan(accounts.internalTable(), readOnlyTx);
var roBeforeCommitTxRows = roBeforeCommitTxFut.get(10, TimeUnit.SECONDS);
assertEquals(2, roBeforeCommitTxRows.size());
for (Tuple row : roBeforeCommitTxRows) {
if (row.longValue("accountNumber") == 1) {
assertEquals(100., row.doubleValue("balance"));
} else {
assertEquals(2, row.longValue("accountNumber"));
assertEquals(500., row.doubleValue("balance"));
}
}
// Commit pending tx.
tx.commit();
// Same read-only transaction.
roBeforeCommitTxFut = scan(accounts.internalTable(), readOnlyTx);
roBeforeCommitTxRows = roBeforeCommitTxFut.get(10, TimeUnit.SECONDS);
assertEquals(2, roBeforeCommitTxRows.size());
for (Tuple row : roBeforeCommitTxRows) {
if (row.longValue("accountNumber") == 1) {
assertEquals(100., row.doubleValue("balance"));
} else {
assertEquals(2, row.longValue("accountNumber"));
assertEquals(500., row.doubleValue("balance"));
}
}
// New read-only transaction.
InternalTransaction readOnlyTx2 = (InternalTransaction) igniteTransactions.begin(new TransactionOptions().readOnly(true));
CompletableFuture<List<Tuple>> roAfterCommitTxFut = scan(accounts.internalTable(), readOnlyTx2);
var roAfterCommitTxRows = roAfterCommitTxFut.get(10, TimeUnit.SECONDS);
assertEquals(1, roAfterCommitTxRows.size());
for (Tuple row : roAfterCommitTxRows) {
assertEquals(1, row.longValue("accountNumber"));
assertEquals(300., row.doubleValue("balance"));
}
}
@Test
public void testReadOnlyGetWriteIntentResolutionUpdate() {
accounts.recordView().upsert(null, makeValue(1, 100.));
// Pending tx
Transaction tx = igniteTransactions.begin();
accounts.recordView().upsert(tx, makeValue(1, 300.));
// Update
Transaction readOnlyTx = igniteTransactions.begin(new TransactionOptions().readOnly(true));
assertEquals(100., accounts.recordView().get(readOnlyTx, makeKey(1)).doubleValue("balance"));
// Commit pending tx.
tx.commit();
// Same read-only transaction.
assertEquals(100., accounts.recordView().get(readOnlyTx, makeKey(1)).doubleValue("balance"));
// New read-only transaction.
Transaction readOnlyTx2 = igniteTransactions.begin(new TransactionOptions().readOnly(true));
assertEquals(300., accounts.recordView().get(readOnlyTx2, makeKey(1)).doubleValue("balance"));
}
@Test
public void testReadOnlyGetWriteIntentResolutionRemove() {
accounts.recordView().upsert(null, makeValue(1, 100.));
// Pending tx
Transaction tx = igniteTransactions.begin();
accounts.recordView().delete(tx, makeKey(1));
// Remove.
Transaction readOnlyTx = igniteTransactions.begin(new TransactionOptions().readOnly(true));
assertEquals(100., accounts.recordView().get(readOnlyTx, makeKey(1)).doubleValue("balance"));
// Commit pending tx.
tx.commit();
// Same read-only transaction.
assertEquals(100., accounts.recordView().get(readOnlyTx, makeKey(1)).doubleValue("balance"));
// New read-only transaction.
Transaction readOnlyTx2 = igniteTransactions.begin(new TransactionOptions().readOnly(true));
Tuple row = accounts.recordView().get(readOnlyTx2, makeKey(1));
assertNull(row);
}
@Test
public void testReadOnlyGetAll() {
accounts.recordView().upsert(null, makeValue(1, 100.));
accounts.recordView().upsert(null, makeValue(2, 200.));
accounts.recordView().upsert(null, makeValue(3, 300.));
Transaction readOnlyTx = igniteTransactions.begin(new TransactionOptions().readOnly(true));
Collection<Tuple> retrievedKeys = accounts.recordView().getAll(readOnlyTx, List.of(makeKey(1), makeKey(2)));
validateBalance(retrievedKeys, 100., 200.);
}
@Test
public void testReadOnlyPendingWriteIntentSkippedCombined() {
accounts.recordView().upsert(null, makeValue(1, 100.));
accounts.recordView().upsert(null, makeValue(2, 200.));
// Pending tx
Transaction tx = igniteTransactions.begin();
accounts.recordView().delete(tx, makeKey(1));
accounts.recordView().upsert(tx, makeValue(2, 300.));
Transaction readOnlyTx = igniteTransactions.begin(new TransactionOptions().readOnly(true));
Collection<Tuple> retrievedKeys = accounts.recordView().getAll(readOnlyTx, List.of(makeKey(1), makeKey(2)));
validateBalance(retrievedKeys, 100., 200.);
// Commit pending tx.
tx.commit();
Collection<Tuple> retrievedKeys2 = accounts.recordView().getAll(readOnlyTx, List.of(makeKey(1), makeKey(2)));
validateBalance(retrievedKeys2, 100., 200.);
Transaction readOnlyTx2 = igniteTransactions.begin(new TransactionOptions().readOnly(true));
Collection<Tuple> retrievedKeys3 = accounts.recordView().getAll(readOnlyTx2, List.of(makeKey(1), makeKey(2)));
validateBalance(retrievedKeys3, null, 300.);
}
@Test
public void testTransactionAlreadyCommitted() {
testTransactionAlreadyFinished(true, true, (transaction, uuid) -> {
transaction.commit();
log.info("Committed transaction {}", uuid);
});
}
@Test
public void testTransactionAlreadyRolledback() {
testTransactionAlreadyFinished(false, true, (transaction, uuid) -> {
transaction.rollback();
log.info("Rolled back transaction {}", uuid);
});
}
@Test
public void testImplicit() {
accounts.recordView().upsert(null, makeValue(1, BALANCE_1));
assertEquals(BALANCE_1, accounts.recordView().get(null, makeKey(1)).doubleValue("balance"));
}
@Test
public void testWriteIntentResolutionFallbackToCommitPartitionPath() {
accounts.recordView().upsert(null, makeValue(1, 100.));
// Pending tx
Transaction tx = igniteTransactions.begin();
accounts.recordView().delete(tx, makeKey(1));
// Imitate the restart of the client node, which is a tx coordinator, in order to make its volatile state unavailable.
// Now coordinator path of the write intent resolution has no effect, and we should fallback to commit partition path.
UUID txId = ((ReadWriteTransactionImpl) tx).id();
for (TxManager txManager : txManagers()) {
txManager.updateTxMeta(txId, old -> old == null ? null : new TxStateMeta(
old.txState(),
"restarted",
old.commitPartitionId(),
old.commitTimestamp()
));
}
// Read-only.
Transaction readOnlyTx = igniteTransactions.begin(new TransactionOptions().readOnly(true));
assertEquals(100., accounts.recordView().get(readOnlyTx, makeKey(1)).doubleValue("balance"));
}
@Test
public void testSingleGet() {
var accountRecordsView = accounts.recordView();
accountRecordsView.upsert(null, makeValue(1, 100.));
Transaction tx1 = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
accountRecordsView.upsert(tx1, makeValue(1, 200.));
assertThrows(TransactionException.class, () -> accountRecordsView.get(tx2, makeKey(1)));
assertEquals(100., accountRecordsView.get(null, makeKey(1)).doubleValue("balance"));
tx1.commit();
assertEquals(200., accountRecordsView.get(null, makeKey(1)).doubleValue("balance"));
}
@Test
public void testBatchSinglePartitionGet() throws Exception {
var accountRecordsView = accounts.recordView();
SchemaRegistry schemaRegistry = accounts.schemaView();
var marshaller = new TupleMarshallerImpl(schemaRegistry.lastKnownSchema());
int partId = accounts.internalTable().partition(marshaller.marshalKey(makeKey(0)));
ArrayList<Integer> keys = new ArrayList<>(10);
keys.add(0);
for (int i = 1; i < 10_000 && keys.size() < 10; i++) {
var p = accounts.internalTable().partition(marshaller.marshalKey(makeKey(i)));
if (p == partId) {
keys.add(i);
}
}
log.info("A batch of keys for a single partition is found [partId={}, keys{}]", partId, keys);
accountRecordsView.upsertAll(null, keys.stream().map(k -> makeValue(k, 100.)).collect(toList()));
Transaction tx1 = igniteTransactions.begin();
Transaction tx2 = igniteTransactions.begin();
accountRecordsView.upsertAll(tx1, keys.stream().map(k -> makeValue(k, 200.)).collect(toList()));
assertThrows(TransactionException.class,
() -> accountRecordsView.getAll(tx2, keys.stream().map(k -> makeKey(k)).collect(toList())));
for (Tuple tuple : accountRecordsView.getAll(null, keys.stream().map(k -> makeKey(k)).collect(toList()))) {
assertEquals(100., tuple.doubleValue("balance"));
}
tx1.commit();
for (Tuple tuple : accountRecordsView.getAll(null, keys.stream().map(k -> makeKey(k)).collect(toList()))) {
assertEquals(200., tuple.doubleValue("balance"));
}
}
@Test
public void testYoungerTransactionWithHigherPriorityWaitsForOlderTransactionCommit() {
IgniteTransactionsImpl igniteTransactionsImpl = (IgniteTransactionsImpl) igniteTransactions;
KeyValueView<Long, String> keyValueView = customers.keyValueView(Long.class, String.class);
// Init data.
keyValueView.put(null, 1L, "init");
// Start low priority transaction.
Transaction oldLowTx = igniteTransactionsImpl.beginWithPriority(false, TxPriority.LOW);
// Update data.
keyValueView.put(oldLowTx, 1L, "low");
// Start normal priority transaction.
Transaction youngNormalTx = igniteTransactionsImpl.beginWithPriority(false, TxPriority.NORMAL);
// Try to update the same key with normal priority.
CompletableFuture<String> objectCompletableFuture = CompletableFuture.supplyAsync(
() -> keyValueView.getAndPut(youngNormalTx, 1L, "normal")
);
// Commit low priority transaction.
oldLowTx.commit();
// Wait for normal priority transaction to update the key.
assertThat(objectCompletableFuture, willBe("low"));
// Commit normal priority transaction.
youngNormalTx.commit();
// Check that normal priority transaction has updated the key.
assertEquals("normal", keyValueView.get(null, 1L));
}
@ParameterizedTest
@EnumSource(TxPriority.class)
public void testYoungerTransactionThrowsExceptionIfKeyLockedByOlderTransactionWithSamePriority(TxPriority priority) {
IgniteTransactionsImpl igniteTransactionsImpl = (IgniteTransactionsImpl) igniteTransactions;
KeyValueView<Long, String> keyValueView = customers.keyValueView(Long.class, String.class);
// Init data.
keyValueView.put(null, 1L, "init");
// Start the first transaction.
Transaction oldNormalTx = igniteTransactionsImpl.beginWithPriority(false, priority);
// Update data with the first transaction.
keyValueView.put(oldNormalTx, 1L, "low");
// Start the second transaction with the same priority.
Transaction youngNormalTx = igniteTransactionsImpl.beginWithPriority(false, priority);
// Try to update the same key with the second normal priority transaction.
assertThrows(TransactionException.class, () -> keyValueView.put(youngNormalTx, 1L, "normal"));
}
@Test
public void testIgniteTransactionsAndReadTimestamp() {
Transaction readWriteTx = igniteTransactions.begin();
assertFalse(readWriteTx.isReadOnly());
assertNull(((InternalTransaction) readWriteTx).readTimestamp());
Transaction readOnlyTx = igniteTransactions.begin(new TransactionOptions().readOnly(true));
assertTrue(readOnlyTx.isReadOnly());
assertNotNull(((InternalTransaction) readOnlyTx).readTimestamp());
readWriteTx.commit();
Transaction readOnlyTx2 = igniteTransactions.begin(new TransactionOptions().readOnly(true));
readOnlyTx2.rollback();
}
/**
* Checks operations that act after a transaction is committed, are finished with exception.
*
* @param commit True when transaction is committed, false the transaction is rolled back.
* @param checkLocks Whether to check locks after.
* @param finisher Finishing closure.
*/
protected void testTransactionAlreadyFinished(
boolean commit,
boolean checkLocks,
BiConsumer<Transaction, UUID> finisher
) {
Transaction tx = igniteTransactions.begin();
var txId = ((ReadWriteTransactionImpl) tx).id();
log.info("Started transaction {}", txId);
var accountsRv = accounts.recordView();
accountsRv.upsert(tx, makeValue(1, 100.));
accountsRv.upsert(tx, makeValue(2, 200.));
Collection<Tuple> res = accountsRv.getAll(tx, List.of(makeKey(1), makeKey(2)));
validateBalance(res, 100., 200.);
finisher.accept(tx, txId);
assertThrowsWithCode(TransactionException.class, TX_ALREADY_FINISHED_ERR,
() -> accountsRv.get(tx, makeKey(1)), "Transaction is already finished");
assertThrowsWithCode(TransactionException.class, TX_ALREADY_FINISHED_ERR,
() -> accountsRv.delete(tx, makeKey(1)), "Transaction is already finished");
assertThrowsWithCode(TransactionException.class, TX_ALREADY_FINISHED_ERR,
() -> accountsRv.get(tx, makeKey(2)), "Transaction is already finished");
assertThrowsWithCode(TransactionException.class, TX_ALREADY_FINISHED_ERR,
() -> accountsRv.upsert(tx, makeValue(2, 300.)), "Transaction is already finished");
if (checkLocks) {
assertTrue(CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId)));
}
if (commit) {
res = accountsRv.getAll(null, List.of(makeKey(1), makeKey(2)));
validateBalance(res, 100., 200.);
} else {
res = accountsRv.getAll(null, List.of(makeKey(1), makeKey(2)));
assertThat(res, contains(null, null));
}
}
}