| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| namespace Apache.Ignite.Core.Tests.Cache |
| { |
| using System; |
| using System.Collections.Generic; |
| using System.Linq; |
| using System.Threading; |
| using System.Threading.Tasks; |
| using System.Transactions; |
| using Apache.Ignite.Core.Cache; |
| using Apache.Ignite.Core.Cache.Configuration; |
| using Apache.Ignite.Core.Impl.Common; |
| using Apache.Ignite.Core.Transactions; |
| using NUnit.Framework; |
| |
| /// <summary> |
| /// Transactional cache tests. |
| /// </summary> |
| public abstract class CacheAbstractTransactionalTest : CacheAbstractTest |
| { |
| /// <summary> |
| /// Simple cache lock test (while <see cref="TestLock"/> is ignored). |
| /// </summary> |
| [Test] |
| public void TestLockSimple() |
| { |
| var cache = Cache(); |
| |
| const int key = 7; |
| |
| Action<ICacheLock> checkLock = lck => |
| { |
| using (lck) |
| { |
| Assert.Throws<InvalidOperationException>(lck.Exit); // can't exit if not entered |
| |
| lck.Enter(); |
| |
| Assert.IsTrue(cache.IsLocalLocked(key, true)); |
| Assert.IsTrue(cache.IsLocalLocked(key, false)); |
| |
| lck.Exit(); |
| |
| Assert.IsFalse(cache.IsLocalLocked(key, true)); |
| Assert.IsFalse(cache.IsLocalLocked(key, false)); |
| |
| Assert.IsTrue(lck.TryEnter()); |
| |
| Assert.IsTrue(cache.IsLocalLocked(key, true)); |
| Assert.IsTrue(cache.IsLocalLocked(key, false)); |
| |
| lck.Exit(); |
| } |
| |
| Assert.Throws<ObjectDisposedException>(lck.Enter); // Can't enter disposed lock |
| }; |
| |
| checkLock(cache.Lock(key)); |
| checkLock(cache.LockAll(new[] { key, 1, 2, 3 })); |
| } |
| |
| /// <summary> |
| /// Tests cache locks. |
| /// </summary> |
| [Test] |
| [Ignore("IGNITE-835")] |
| public void TestLock() |
| { |
| var cache = Cache(); |
| |
| const int key = 7; |
| |
| // Lock |
| CheckLock(cache, key, () => cache.Lock(key)); |
| |
| // LockAll |
| CheckLock(cache, key, () => cache.LockAll(new[] { key, 2, 3, 4, 5 })); |
| } |
| |
| /// <summary> |
| /// Internal lock test routine. |
| /// </summary> |
| /// <param name="cache">Cache.</param> |
| /// <param name="key">Key.</param> |
| /// <param name="getLock">Function to get the lock.</param> |
| private static void CheckLock(ICache<int, int> cache, int key, Func<ICacheLock> getLock) |
| { |
| var sharedLock = getLock(); |
| |
| using (sharedLock) |
| { |
| Assert.Throws<InvalidOperationException>(() => sharedLock.Exit()); // can't exit if not entered |
| |
| sharedLock.Enter(); |
| |
| try |
| { |
| Assert.IsTrue(cache.IsLocalLocked(key, true)); |
| Assert.IsTrue(cache.IsLocalLocked(key, false)); |
| |
| EnsureCannotLock(getLock, sharedLock); |
| |
| sharedLock.Enter(); |
| |
| try |
| { |
| Assert.IsTrue(cache.IsLocalLocked(key, true)); |
| Assert.IsTrue(cache.IsLocalLocked(key, false)); |
| |
| EnsureCannotLock(getLock, sharedLock); |
| } |
| finally |
| { |
| sharedLock.Exit(); |
| } |
| |
| Assert.IsTrue(cache.IsLocalLocked(key, true)); |
| Assert.IsTrue(cache.IsLocalLocked(key, false)); |
| |
| EnsureCannotLock(getLock, sharedLock); |
| |
| Assert.Throws<SynchronizationLockException>(() => sharedLock.Dispose()); // can't dispose while locked |
| } |
| finally |
| { |
| sharedLock.Exit(); |
| } |
| |
| Assert.IsFalse(cache.IsLocalLocked(key, true)); |
| Assert.IsFalse(cache.IsLocalLocked(key, false)); |
| |
| var innerTask = new Task(() => |
| { |
| Assert.IsTrue(sharedLock.TryEnter()); |
| sharedLock.Exit(); |
| |
| using (var otherLock = getLock()) |
| { |
| Assert.IsTrue(otherLock.TryEnter()); |
| otherLock.Exit(); |
| } |
| }); |
| |
| innerTask.Start(); |
| innerTask.Wait(); |
| } |
| |
| Assert.IsFalse(cache.IsLocalLocked(key, true)); |
| Assert.IsFalse(cache.IsLocalLocked(key, false)); |
| |
| var outerTask = new Task(() => |
| { |
| using (var otherLock = getLock()) |
| { |
| Assert.IsTrue(otherLock.TryEnter()); |
| otherLock.Exit(); |
| } |
| }); |
| |
| outerTask.Start(); |
| outerTask.Wait(); |
| |
| Assert.Throws<ObjectDisposedException>(() => sharedLock.Enter()); // Can't enter disposed lock |
| } |
| |
| /// <summary> |
| /// Ensure that lock cannot be obtained by other threads. |
| /// </summary> |
| /// <param name="getLock">Get lock function.</param> |
| /// <param name="sharedLock">Shared lock.</param> |
| private static void EnsureCannotLock(Func<ICacheLock> getLock, ICacheLock sharedLock) |
| { |
| var task = new Task(() => |
| { |
| Assert.IsFalse(sharedLock.TryEnter()); |
| Assert.IsFalse(sharedLock.TryEnter(TimeSpan.FromMilliseconds(100))); |
| |
| using (var otherLock = getLock()) |
| { |
| Assert.IsFalse(otherLock.TryEnter()); |
| Assert.IsFalse(otherLock.TryEnter(TimeSpan.FromMilliseconds(100))); |
| } |
| }); |
| |
| task.Start(); |
| task.Wait(); |
| } |
| |
| /// <summary> |
| /// Tests that commit applies cache changes. |
| /// </summary> |
| [Test] |
| public void TestTxCommit([Values(true, false)] bool async) |
| { |
| var cache = Cache(); |
| |
| Assert.IsNull(Transactions.Tx); |
| |
| using (var tx = Transactions.TxStart()) |
| { |
| cache.Put(1, 1); |
| cache.Put(2, 2); |
| |
| if (async) |
| { |
| var task = tx.CommitAsync(); |
| |
| task.Wait(); |
| |
| Assert.IsTrue(task.IsCompleted); |
| } |
| else |
| tx.Commit(); |
| } |
| |
| Assert.AreEqual(1, cache.Get(1)); |
| Assert.AreEqual(2, cache.Get(2)); |
| |
| Assert.IsNull(Transactions.Tx); |
| } |
| |
| /// <summary> |
| /// Tests that rollback reverts cache changes. |
| /// </summary> |
| [Test] |
| public void TestTxRollback() |
| { |
| var cache = Cache(); |
| |
| cache.Put(1, 1); |
| cache.Put(2, 2); |
| |
| Assert.IsNull(Transactions.Tx); |
| |
| using (var tx = Transactions.TxStart()) |
| { |
| cache.Put(1, 10); |
| cache.Put(2, 20); |
| |
| tx.Rollback(); |
| } |
| |
| Assert.AreEqual(1, cache.Get(1)); |
| Assert.AreEqual(2, cache.Get(2)); |
| |
| Assert.IsNull(Transactions.Tx); |
| } |
| |
| /// <summary> |
| /// Tests that Dispose without Commit reverts changes. |
| /// </summary> |
| [Test] |
| public void TestTxClose() |
| { |
| var cache = Cache(); |
| |
| cache.Put(1, 1); |
| cache.Put(2, 2); |
| |
| Assert.IsNull(Transactions.Tx); |
| |
| using (Transactions.TxStart()) |
| { |
| cache.Put(1, 10); |
| cache.Put(2, 20); |
| } |
| |
| Assert.AreEqual(1, cache.Get(1)); |
| Assert.AreEqual(2, cache.Get(2)); |
| |
| Assert.IsNull(Transactions.Tx); |
| } |
| |
| /// <summary> |
| /// Tests all concurrency and isolation modes with and without timeout. |
| /// </summary> |
| [Test] |
| public void TestTxAllModes([Values(true, false)] bool withTimeout) |
| { |
| var cache = Cache(); |
| |
| int cntr = 0; |
| |
| foreach (TransactionConcurrency concurrency in Enum.GetValues(typeof(TransactionConcurrency))) |
| { |
| foreach (TransactionIsolation isolation in Enum.GetValues(typeof(TransactionIsolation))) |
| { |
| Console.WriteLine("Test tx [concurrency=" + concurrency + ", isolation=" + isolation + "]"); |
| |
| Assert.IsNull(Transactions.Tx); |
| |
| using (var tx = withTimeout |
| ? Transactions.TxStart(concurrency, isolation, TimeSpan.FromMilliseconds(1100), 10) |
| : Transactions.TxStart(concurrency, isolation)) |
| { |
| |
| Assert.AreEqual(concurrency, tx.Concurrency); |
| Assert.AreEqual(isolation, tx.Isolation); |
| |
| if (withTimeout) |
| Assert.AreEqual(1100, tx.Timeout.TotalMilliseconds); |
| |
| cache.Put(1, cntr); |
| |
| tx.Commit(); |
| } |
| |
| Assert.IsNull(Transactions.Tx); |
| |
| Assert.AreEqual(cntr, cache.Get(1)); |
| |
| cntr++; |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Tests that transaction properties are applied and propagated properly. |
| /// </summary> |
| [Test] |
| public void TestTxAttributes() |
| { |
| ITransaction tx = Transactions.TxStart(TransactionConcurrency.Optimistic, |
| TransactionIsolation.RepeatableRead, TimeSpan.FromMilliseconds(2500), 100); |
| |
| Assert.IsFalse(tx.IsRollbackOnly); |
| Assert.AreEqual(TransactionConcurrency.Optimistic, tx.Concurrency); |
| Assert.AreEqual(TransactionIsolation.RepeatableRead, tx.Isolation); |
| Assert.AreEqual(2500, tx.Timeout.TotalMilliseconds); |
| Assert.AreEqual(TransactionState.Active, tx.State); |
| Assert.IsTrue(tx.StartTime.Ticks > 0); |
| Assert.AreEqual(tx.NodeId, GetIgnite(0).GetCluster().GetLocalNode().Id); |
| Assert.AreEqual(Transactions.DefaultTimeoutOnPartitionMapExchange, TimeSpan.Zero); |
| |
| DateTime startTime1 = tx.StartTime; |
| |
| tx.Commit(); |
| |
| Assert.IsFalse(tx.IsRollbackOnly); |
| Assert.AreEqual(TransactionState.Committed, tx.State); |
| Assert.AreEqual(TransactionConcurrency.Optimistic, tx.Concurrency); |
| Assert.AreEqual(TransactionIsolation.RepeatableRead, tx.Isolation); |
| Assert.AreEqual(2500, tx.Timeout.TotalMilliseconds); |
| Assert.AreEqual(startTime1, tx.StartTime); |
| |
| Thread.Sleep(100); |
| |
| tx = Transactions.TxStart(TransactionConcurrency.Pessimistic, TransactionIsolation.ReadCommitted, |
| TimeSpan.FromMilliseconds(3500), 200); |
| |
| Assert.IsFalse(tx.IsRollbackOnly); |
| Assert.AreEqual(TransactionConcurrency.Pessimistic, tx.Concurrency); |
| Assert.AreEqual(TransactionIsolation.ReadCommitted, tx.Isolation); |
| Assert.AreEqual(3500, tx.Timeout.TotalMilliseconds); |
| Assert.AreEqual(TransactionState.Active, tx.State); |
| Assert.IsTrue(tx.StartTime.Ticks > 0); |
| Assert.IsTrue(tx.StartTime > startTime1); |
| |
| DateTime startTime2 = tx.StartTime; |
| |
| tx.Rollback(); |
| |
| Assert.AreEqual(TransactionState.RolledBack, tx.State); |
| Assert.AreEqual(TransactionConcurrency.Pessimistic, tx.Concurrency); |
| Assert.AreEqual(TransactionIsolation.ReadCommitted, tx.Isolation); |
| Assert.AreEqual(3500, tx.Timeout.TotalMilliseconds); |
| Assert.AreEqual(startTime2, tx.StartTime); |
| |
| Thread.Sleep(100); |
| |
| tx = Transactions.TxStart(TransactionConcurrency.Optimistic, TransactionIsolation.RepeatableRead, |
| TimeSpan.FromMilliseconds(2500), 100); |
| |
| Assert.IsFalse(tx.IsRollbackOnly); |
| Assert.AreEqual(TransactionConcurrency.Optimistic, tx.Concurrency); |
| Assert.AreEqual(TransactionIsolation.RepeatableRead, tx.Isolation); |
| Assert.AreEqual(2500, tx.Timeout.TotalMilliseconds); |
| Assert.AreEqual(TransactionState.Active, tx.State); |
| Assert.IsTrue(tx.StartTime > startTime2); |
| |
| DateTime startTime3 = tx.StartTime; |
| |
| tx.Commit(); |
| |
| Assert.IsFalse(tx.IsRollbackOnly); |
| Assert.AreEqual(TransactionState.Committed, tx.State); |
| Assert.AreEqual(TransactionConcurrency.Optimistic, tx.Concurrency); |
| Assert.AreEqual(TransactionIsolation.RepeatableRead, tx.Isolation); |
| Assert.AreEqual(2500, tx.Timeout.TotalMilliseconds); |
| Assert.AreEqual(startTime3, tx.StartTime); |
| |
| // Check defaults. |
| tx = Transactions.TxStart(); |
| |
| Assert.AreEqual(Transactions.DefaultTransactionConcurrency, tx.Concurrency); |
| Assert.AreEqual(Transactions.DefaultTransactionIsolation, tx.Isolation); |
| Assert.AreEqual(Transactions.DefaultTimeout, tx.Timeout); |
| |
| tx.Commit(); |
| } |
| |
| /// <summary> |
| /// Tests <see cref="ITransaction.IsRollbackOnly"/> flag. |
| /// </summary> |
| [Test] |
| public void TestTxRollbackOnly() |
| { |
| var cache = Cache(); |
| |
| cache.Put(1, 1); |
| cache.Put(2, 2); |
| |
| var tx = Transactions.TxStart(); |
| |
| cache.Put(1, 10); |
| cache.Put(2, 20); |
| |
| Assert.IsFalse(tx.IsRollbackOnly); |
| |
| tx.SetRollbackonly(); |
| |
| Assert.IsTrue(tx.IsRollbackOnly); |
| |
| Assert.AreEqual(TransactionState.MarkedRollback, tx.State); |
| |
| Assert.Throws<TransactionRollbackException>(() => tx.Commit()); |
| |
| tx.Dispose(); |
| |
| Assert.AreEqual(TransactionState.RolledBack, tx.State); |
| |
| Assert.IsTrue(tx.IsRollbackOnly); |
| |
| Assert.AreEqual(1, cache.Get(1)); |
| Assert.AreEqual(2, cache.Get(2)); |
| |
| Assert.IsNull(Transactions.Tx); |
| } |
| |
| /// <summary> |
| /// Tests transaction metrics. |
| /// </summary> |
| [Test] |
| public void TestTxMetrics() |
| { |
| var cache = Cache(); |
| |
| var startTime = DateTime.UtcNow.AddSeconds(-1); |
| |
| Transactions.ResetMetrics(); |
| |
| var metrics = Transactions.GetMetrics(); |
| |
| Assert.AreEqual(0, metrics.TxCommits); |
| Assert.AreEqual(0, metrics.TxRollbacks); |
| |
| using (Transactions.TxStart()) |
| { |
| cache.Put(1, 1); |
| } |
| |
| using (var tx = Transactions.TxStart()) |
| { |
| cache.Put(1, 1); |
| tx.Commit(); |
| } |
| |
| metrics = Transactions.GetMetrics(); |
| |
| Assert.AreEqual(1, metrics.TxCommits); |
| Assert.AreEqual(1, metrics.TxRollbacks); |
| |
| Assert.LessOrEqual(startTime, metrics.CommitTime); |
| Assert.LessOrEqual(startTime, metrics.RollbackTime); |
| |
| Assert.GreaterOrEqual(DateTime.UtcNow, metrics.CommitTime); |
| Assert.GreaterOrEqual(DateTime.UtcNow, metrics.RollbackTime); |
| } |
| |
| /// <summary> |
| /// Tests transaction state transitions. |
| /// </summary> |
| [Test] |
| public void TestTxStateAndExceptions() |
| { |
| var tx = Transactions.TxStart(); |
| |
| Assert.AreEqual(TransactionState.Active, tx.State); |
| Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, tx.ThreadId); |
| |
| tx.AddMeta("myMeta", 42); |
| Assert.AreEqual(42, tx.Meta<int>("myMeta")); |
| Assert.AreEqual(42, tx.RemoveMeta<int>("myMeta")); |
| |
| tx.RollbackAsync().Wait(); |
| |
| Assert.AreEqual(TransactionState.RolledBack, tx.State); |
| |
| Assert.Throws<InvalidOperationException>(() => tx.Commit()); |
| |
| tx = Transactions.TxStart(); |
| |
| Assert.AreEqual(TransactionState.Active, tx.State); |
| |
| tx.CommitAsync().Wait(); |
| |
| Assert.AreEqual(TransactionState.Committed, tx.State); |
| |
| var task = tx.RollbackAsync(); // Illegal, but should not fail here; will fail in task |
| |
| Assert.Throws<AggregateException>(() => task.Wait()); |
| } |
| |
| /// <summary> |
| /// Tests the transaction deadlock detection. |
| /// </summary> |
| [Test] |
| public void TestTxDeadlockDetection() |
| { |
| var cache = Cache(); |
| |
| var keys0 = Enumerable.Range(1, 100).ToArray(); |
| |
| cache.PutAll(keys0.ToDictionary(x => x, x => x)); |
| |
| var barrier = new Barrier(2); |
| |
| Action<int[]> increment = keys => |
| { |
| using (var tx = Transactions.TxStart(TransactionConcurrency.Pessimistic, |
| TransactionIsolation.RepeatableRead, TimeSpan.FromSeconds(0.5), 0)) |
| { |
| foreach (var key in keys) |
| cache[key]++; |
| |
| barrier.SignalAndWait(500); |
| |
| tx.Commit(); |
| } |
| }; |
| |
| // Increment keys within tx in different order to cause a deadlock. |
| var aex = Assert.Throws<AggregateException>(() => |
| Task.WaitAll(new[] |
| { |
| TaskRunner.Run(() => increment(keys0)), |
| TaskRunner.Run(() => increment(keys0.Reverse().ToArray())) |
| }, |
| TimeSpan.FromSeconds(40))); |
| |
| Assert.AreEqual(2, aex.InnerExceptions.Count); |
| |
| var deadlockEx = aex.InnerExceptions.OfType<TransactionDeadlockException>().FirstOrDefault(); |
| |
| if (deadlockEx != null) |
| { |
| Assert.IsTrue(deadlockEx.Message.Trim().StartsWith("Deadlock detected:"), deadlockEx.Message); |
| } |
| else |
| { |
| Assert.Fail("Unexpected exception: " + aex); |
| } |
| } |
| |
| /// <summary> |
| /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/>. |
| /// </summary> |
| [Test] |
| public void TestTransactionScopeSingleCache() |
| { |
| var cache = Cache(); |
| |
| cache[1] = 1; |
| cache[2] = 2; |
| |
| // Commit. |
| using (var ts = new TransactionScope()) |
| { |
| cache[1] = 10; |
| cache[2] = 20; |
| |
| Assert.IsNotNull(cache.Ignite.GetTransactions().Tx); |
| |
| ts.Complete(); |
| } |
| |
| Assert.AreEqual(10, cache[1]); |
| Assert.AreEqual(20, cache[2]); |
| |
| // Rollback. |
| using (new TransactionScope()) |
| { |
| cache[1] = 100; |
| cache[2] = 200; |
| } |
| |
| Assert.AreEqual(10, cache[1]); |
| Assert.AreEqual(20, cache[2]); |
| } |
| |
| /// <summary> |
| /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/> |
| /// with multiple participating caches. |
| /// </summary> |
| [Test] |
| public void TestTransactionScopeMultiCache([Values(true, false)] bool async) |
| { |
| var cache1 = Cache(); |
| |
| var cache2 = GetIgnite(0).GetOrCreateCache<int, int>(new CacheConfiguration(cache1.Name + "_") |
| { |
| AtomicityMode = CacheAtomicityMode.Transactional |
| }); |
| |
| cache1[1] = 1; |
| cache2[1] = 2; |
| |
| // Commit. |
| using (var ts = new TransactionScope()) |
| { |
| if (async) |
| { |
| cache1.PutAsync(1, 10); |
| cache2.PutAsync(1, 20); |
| } |
| else |
| { |
| cache1.Put(1, 10); |
| cache2.Put(1, 20); |
| } |
| |
| ts.Complete(); |
| } |
| |
| Assert.AreEqual(10, cache1[1]); |
| Assert.AreEqual(20, cache2[1]); |
| |
| // Rollback. |
| using (new TransactionScope()) |
| { |
| if (async) |
| { |
| cache1.PutAsync(1, 100); |
| cache2.PutAsync(1, 200); |
| } |
| else |
| { |
| cache1.Put(1, 100); |
| cache2.Put(1, 200); |
| } |
| } |
| |
| Assert.AreEqual(10, cache1[1]); |
| Assert.AreEqual(20, cache2[1]); |
| } |
| |
| /// <summary> |
| /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/> |
| /// when Ignite tx is started manually. |
| /// </summary> |
| [Test] |
| public void TestTransactionScopeWithManualIgniteTx() |
| { |
| var cache = Cache(); |
| var transactions = cache.Ignite.GetTransactions(); |
| |
| cache[1] = 1; |
| |
| // When Ignite tx is started manually, it won't be enlisted in TransactionScope. |
| using (var tx = transactions.TxStart()) |
| { |
| using (new TransactionScope()) |
| { |
| cache[1] = 2; |
| } // Revert transaction scope. |
| |
| tx.Commit(); // Commit manual tx. |
| } |
| |
| Assert.AreEqual(2, cache[1]); |
| } |
| |
| /// <summary> |
| /// Test Ignite transaction with <see cref="TransactionScopeOption.Suppress"/> option. |
| /// </summary> |
| [Test] |
| public void TestSuppressedTransactionScope() |
| { |
| var cache = Cache(); |
| |
| cache[1] = 1; |
| |
| using (new TransactionScope(TransactionScopeOption.Suppress)) |
| { |
| cache[1] = 2; |
| } |
| |
| // Even though transaction is not completed, the value is updated, because tx is suppressed. |
| Assert.AreEqual(2, cache[1]); |
| } |
| |
| /// <summary> |
| /// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/> with nested scopes. |
| /// </summary> |
| [Test] |
| public void TestNestedTransactionScope() |
| { |
| var cache = Cache(); |
| |
| cache[1] = 1; |
| |
| foreach (var option in new[] {TransactionScopeOption.Required, TransactionScopeOption.RequiresNew}) |
| { |
| // Commit. |
| using (var ts1 = new TransactionScope()) |
| { |
| using (var ts2 = new TransactionScope(option)) |
| { |
| cache[1] = 2; |
| ts2.Complete(); |
| } |
| |
| cache[1] = 3; |
| ts1.Complete(); |
| } |
| |
| Assert.AreEqual(3, cache[1]); |
| |
| // Rollback. |
| using (new TransactionScope()) |
| { |
| using (new TransactionScope(option)) |
| cache[1] = 4; |
| |
| cache[1] = 5; |
| } |
| |
| // In case with Required option there is a single tx |
| // that gets aborted, second put executes outside the tx. |
| Assert.AreEqual(option == TransactionScopeOption.Required ? 5 : 3, cache[1], option.ToString()); |
| } |
| } |
| |
| /// <summary> |
| /// Test that ambient <see cref="TransactionScope"/> options propagate to Ignite transaction. |
| /// </summary> |
| [Test] |
| public void TestTransactionScopeOptions() |
| { |
| var cache = Cache(); |
| var transactions = cache.Ignite.GetTransactions(); |
| |
| var modes = new[] |
| { |
| Tuple.Create(IsolationLevel.Serializable, TransactionIsolation.Serializable), |
| Tuple.Create(IsolationLevel.RepeatableRead, TransactionIsolation.RepeatableRead), |
| Tuple.Create(IsolationLevel.ReadCommitted, TransactionIsolation.ReadCommitted), |
| Tuple.Create(IsolationLevel.ReadUncommitted, TransactionIsolation.ReadCommitted), |
| Tuple.Create(IsolationLevel.Snapshot, TransactionIsolation.ReadCommitted), |
| Tuple.Create(IsolationLevel.Chaos, TransactionIsolation.ReadCommitted), |
| }; |
| |
| foreach (var mode in modes) |
| { |
| using (new TransactionScope(TransactionScopeOption.Required, new TransactionOptions |
| { |
| IsolationLevel = mode.Item1 |
| })) |
| { |
| cache[1] = 1; |
| |
| var tx = transactions.Tx; |
| Assert.AreEqual(mode.Item2, tx.Isolation); |
| Assert.AreEqual(transactions.DefaultTransactionConcurrency, tx.Concurrency); |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Tests all transactional operations with <see cref="TransactionScope"/>. |
| /// </summary> |
| [Test] |
| public void TestTransactionScopeAllOperations() |
| { |
| for (var i = 0; i < 10; i++) |
| { |
| CheckTxOp((cache, key) => cache.Put(key, -5)); |
| CheckTxOp((cache, key) => cache.PutAsync(key, -5)); |
| |
| CheckTxOp((cache, key) => cache.PutAll(new Dictionary<int, int> {{key, -7}})); |
| CheckTxOp((cache, key) => cache.PutAllAsync(new Dictionary<int, int> {{key, -7}})); |
| |
| CheckTxOp((cache, key) => |
| { |
| cache.Remove(key); |
| cache.PutIfAbsent(key, -10); |
| }); |
| CheckTxOp((cache, key) => |
| { |
| cache.Remove(key); |
| cache.PutIfAbsentAsync(key, -10); |
| }); |
| |
| CheckTxOp((cache, key) => cache.GetAndPut(key, -9)); |
| CheckTxOp((cache, key) => cache.GetAndPutAsync(key, -9)); |
| |
| CheckTxOp((cache, key) => |
| { |
| cache.Remove(key); |
| cache.GetAndPutIfAbsent(key, -10); |
| }); |
| CheckTxOp((cache, key) => |
| { |
| cache.Remove(key); |
| cache.GetAndPutIfAbsentAsync(key, -10); |
| }); |
| |
| CheckTxOp((cache, key) => cache.GetAndRemove(key)); |
| CheckTxOp((cache, key) => cache.GetAndRemoveAsync(key)); |
| |
| CheckTxOp((cache, key) => cache.GetAndReplace(key, -11)); |
| CheckTxOp((cache, key) => cache.GetAndReplaceAsync(key, -11)); |
| |
| CheckTxOp((cache, key) => cache.Invoke(key, new AddProcessor(), 1)); |
| CheckTxOp((cache, key) => cache.InvokeAsync(key, new AddProcessor(), 1)); |
| |
| CheckTxOp((cache, key) => cache.InvokeAll(new[] {key}, new AddProcessor(), 1)); |
| CheckTxOp((cache, key) => cache.InvokeAllAsync(new[] {key}, new AddProcessor(), 1)); |
| |
| CheckTxOp((cache, key) => cache.Remove(key)); |
| CheckTxOp((cache, key) => cache.RemoveAsync(key)); |
| |
| CheckTxOp((cache, key) => cache.RemoveAll(new[] {key})); |
| CheckTxOp((cache, key) => cache.RemoveAllAsync(new[] {key})); |
| |
| CheckTxOp((cache, key) => cache.Replace(key, 100)); |
| CheckTxOp((cache, key) => cache.ReplaceAsync(key, 100)); |
| |
| CheckTxOp((cache, key) => cache.Replace(key, cache[key], 100)); |
| CheckTxOp((cache, key) => cache.ReplaceAsync(key, cache[key], 100)); |
| } |
| } |
| |
| /// <summary> |
| /// Tests that read operations lock keys in Serializable mode. |
| /// </summary> |
| [Test] |
| public void TestTransactionScopeWithSerializableIsolationLocksKeysOnRead() |
| { |
| Action<Func<ICache<int, int>, int, int>> |
| test = TestTransactionScopeWithSerializableIsolationLocksKeysOnRead; |
| |
| test((cache, key) => cache[key]); |
| test((cache, key) => cache.Get(key)); |
| test((cache, key) => cache.GetAsync(key).Result); |
| test((cache, key) => { int val; return cache.TryGet(key, out val) ? val : 0; }); |
| test((cache, key) => cache.TryGetAsync(key).Result.Value); |
| test((cache, key) => cache.GetAll(new[] {key}).Single().Value); |
| test((cache, key) => cache.GetAllAsync(new[] {key}).Result.Single().Value); |
| } |
| |
| /// <summary> |
| /// Tests that read operations lock keys in Serializable mode. |
| /// </summary> |
| private void TestTransactionScopeWithSerializableIsolationLocksKeysOnRead( |
| Func<ICache<int, int>, int, int> readOp) |
| { |
| var cache = Cache(); |
| cache.Put(1, 1); |
| |
| var options = new TransactionOptions {IsolationLevel = IsolationLevel.Serializable}; |
| |
| using (var scope = new TransactionScope(TransactionScopeOption.Required, options)) |
| { |
| Assert.AreEqual(1, readOp(cache, 1)); |
| Assert.IsNotNull(GetIgnite(0).GetTransactions().Tx); |
| |
| var evt = new ManualResetEventSlim(); |
| |
| var task = Task.Factory.StartNew(() => |
| { |
| cache.PutAsync(1, 2); |
| evt.Set(); |
| }); |
| |
| evt.Wait(); |
| |
| Assert.AreEqual(1, readOp(cache, 1)); |
| |
| scope.Complete(); |
| task.Wait(); |
| } |
| |
| TestUtils.WaitForTrueCondition(() => 2 == readOp(cache, 1)); |
| } |
| |
| /// <summary> |
| /// Checks that cache operation behaves transactionally. |
| /// </summary> |
| private void CheckTxOp(Action<ICache<int, int>, int> act) |
| { |
| var isolationLevels = new[] |
| { |
| IsolationLevel.Serializable, IsolationLevel.RepeatableRead, IsolationLevel.ReadCommitted, |
| IsolationLevel.ReadUncommitted, IsolationLevel.Snapshot, IsolationLevel.Chaos |
| }; |
| |
| foreach (var isolationLevel in isolationLevels) |
| { |
| var txOpts = new TransactionOptions {IsolationLevel = isolationLevel}; |
| const TransactionScopeOption scope = TransactionScopeOption.Required; |
| |
| var cache = Cache(); |
| |
| cache[1] = 1; |
| cache[2] = 2; |
| |
| // Rollback. |
| using (new TransactionScope(scope, txOpts)) |
| { |
| act(cache, 1); |
| |
| Assert.IsNotNull(cache.Ignite.GetTransactions().Tx, "Transaction has not started."); |
| } |
| |
| Assert.AreEqual(1, cache[1]); |
| Assert.AreEqual(2, cache[2]); |
| |
| using (new TransactionScope(scope, txOpts)) |
| { |
| act(cache, 1); |
| act(cache, 2); |
| } |
| |
| Assert.AreEqual(1, cache[1]); |
| Assert.AreEqual(2, cache[2]); |
| |
| // Commit. |
| using (var ts = new TransactionScope(scope, txOpts)) |
| { |
| act(cache, 1); |
| ts.Complete(); |
| } |
| |
| Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1); |
| Assert.AreEqual(2, cache[2]); |
| |
| using (var ts = new TransactionScope(scope, txOpts)) |
| { |
| act(cache, 1); |
| act(cache, 2); |
| ts.Complete(); |
| } |
| |
| Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1); |
| Assert.IsTrue(!cache.ContainsKey(2) || cache[2] != 2); |
| } |
| } |
| |
| [Serializable] |
| private class AddProcessor : ICacheEntryProcessor<int, int, int, int> |
| { |
| public int Process(IMutableCacheEntry<int, int> entry, int arg) |
| { |
| entry.Value += arg; |
| return arg; |
| } |
| } |
| } |
| } |