blob: f11136941fae8d515f8c02920ed249348cabc2fe [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Apache.Ignite.Core.Tests.Cache
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Transactions;
using Apache.Ignite.Core.Cache;
using Apache.Ignite.Core.Cache.Configuration;
using Apache.Ignite.Core.Impl.Common;
using Apache.Ignite.Core.Transactions;
using NUnit.Framework;
/// <summary>
/// Transactional cache tests.
/// </summary>
public abstract class CacheAbstractTransactionalTest : CacheAbstractTest
{
/// <summary>
/// Simple cache lock test (while <see cref="TestLock"/> is ignored).
/// </summary>
[Test]
public void TestLockSimple()
{
var cache = Cache();
const int key = 7;
Action<ICacheLock> checkLock = lck =>
{
using (lck)
{
Assert.Throws<InvalidOperationException>(lck.Exit); // can't exit if not entered
lck.Enter();
Assert.IsTrue(cache.IsLocalLocked(key, true));
Assert.IsTrue(cache.IsLocalLocked(key, false));
lck.Exit();
Assert.IsFalse(cache.IsLocalLocked(key, true));
Assert.IsFalse(cache.IsLocalLocked(key, false));
Assert.IsTrue(lck.TryEnter());
Assert.IsTrue(cache.IsLocalLocked(key, true));
Assert.IsTrue(cache.IsLocalLocked(key, false));
lck.Exit();
}
Assert.Throws<ObjectDisposedException>(lck.Enter); // Can't enter disposed lock
};
checkLock(cache.Lock(key));
checkLock(cache.LockAll(new[] { key, 1, 2, 3 }));
}
/// <summary>
/// Tests cache locks.
/// </summary>
[Test]
[Ignore("IGNITE-835")]
public void TestLock()
{
var cache = Cache();
const int key = 7;
// Lock
CheckLock(cache, key, () => cache.Lock(key));
// LockAll
CheckLock(cache, key, () => cache.LockAll(new[] { key, 2, 3, 4, 5 }));
}
/// <summary>
/// Internal lock test routine.
/// </summary>
/// <param name="cache">Cache.</param>
/// <param name="key">Key.</param>
/// <param name="getLock">Function to get the lock.</param>
private static void CheckLock(ICache<int, int> cache, int key, Func<ICacheLock> getLock)
{
var sharedLock = getLock();
using (sharedLock)
{
Assert.Throws<InvalidOperationException>(() => sharedLock.Exit()); // can't exit if not entered
sharedLock.Enter();
try
{
Assert.IsTrue(cache.IsLocalLocked(key, true));
Assert.IsTrue(cache.IsLocalLocked(key, false));
EnsureCannotLock(getLock, sharedLock);
sharedLock.Enter();
try
{
Assert.IsTrue(cache.IsLocalLocked(key, true));
Assert.IsTrue(cache.IsLocalLocked(key, false));
EnsureCannotLock(getLock, sharedLock);
}
finally
{
sharedLock.Exit();
}
Assert.IsTrue(cache.IsLocalLocked(key, true));
Assert.IsTrue(cache.IsLocalLocked(key, false));
EnsureCannotLock(getLock, sharedLock);
Assert.Throws<SynchronizationLockException>(() => sharedLock.Dispose()); // can't dispose while locked
}
finally
{
sharedLock.Exit();
}
Assert.IsFalse(cache.IsLocalLocked(key, true));
Assert.IsFalse(cache.IsLocalLocked(key, false));
var innerTask = new Task(() =>
{
Assert.IsTrue(sharedLock.TryEnter());
sharedLock.Exit();
using (var otherLock = getLock())
{
Assert.IsTrue(otherLock.TryEnter());
otherLock.Exit();
}
});
innerTask.Start();
innerTask.Wait();
}
Assert.IsFalse(cache.IsLocalLocked(key, true));
Assert.IsFalse(cache.IsLocalLocked(key, false));
var outerTask = new Task(() =>
{
using (var otherLock = getLock())
{
Assert.IsTrue(otherLock.TryEnter());
otherLock.Exit();
}
});
outerTask.Start();
outerTask.Wait();
Assert.Throws<ObjectDisposedException>(() => sharedLock.Enter()); // Can't enter disposed lock
}
/// <summary>
/// Ensure that lock cannot be obtained by other threads.
/// </summary>
/// <param name="getLock">Get lock function.</param>
/// <param name="sharedLock">Shared lock.</param>
private static void EnsureCannotLock(Func<ICacheLock> getLock, ICacheLock sharedLock)
{
var task = new Task(() =>
{
Assert.IsFalse(sharedLock.TryEnter());
Assert.IsFalse(sharedLock.TryEnter(TimeSpan.FromMilliseconds(100)));
using (var otherLock = getLock())
{
Assert.IsFalse(otherLock.TryEnter());
Assert.IsFalse(otherLock.TryEnter(TimeSpan.FromMilliseconds(100)));
}
});
task.Start();
task.Wait();
}
/// <summary>
/// Tests that commit applies cache changes.
/// </summary>
[Test]
public void TestTxCommit([Values(true, false)] bool async)
{
var cache = Cache();
Assert.IsNull(Transactions.Tx);
using (var tx = Transactions.TxStart())
{
cache.Put(1, 1);
cache.Put(2, 2);
if (async)
{
var task = tx.CommitAsync();
task.Wait();
Assert.IsTrue(task.IsCompleted);
}
else
tx.Commit();
}
Assert.AreEqual(1, cache.Get(1));
Assert.AreEqual(2, cache.Get(2));
Assert.IsNull(Transactions.Tx);
}
/// <summary>
/// Tests that rollback reverts cache changes.
/// </summary>
[Test]
public void TestTxRollback()
{
var cache = Cache();
cache.Put(1, 1);
cache.Put(2, 2);
Assert.IsNull(Transactions.Tx);
using (var tx = Transactions.TxStart())
{
cache.Put(1, 10);
cache.Put(2, 20);
tx.Rollback();
}
Assert.AreEqual(1, cache.Get(1));
Assert.AreEqual(2, cache.Get(2));
Assert.IsNull(Transactions.Tx);
}
/// <summary>
/// Tests that Dispose without Commit reverts changes.
/// </summary>
[Test]
public void TestTxClose()
{
var cache = Cache();
cache.Put(1, 1);
cache.Put(2, 2);
Assert.IsNull(Transactions.Tx);
using (Transactions.TxStart())
{
cache.Put(1, 10);
cache.Put(2, 20);
}
Assert.AreEqual(1, cache.Get(1));
Assert.AreEqual(2, cache.Get(2));
Assert.IsNull(Transactions.Tx);
}
/// <summary>
/// Tests all concurrency and isolation modes with and without timeout.
/// </summary>
[Test]
public void TestTxAllModes([Values(true, false)] bool withTimeout)
{
var cache = Cache();
int cntr = 0;
foreach (TransactionConcurrency concurrency in Enum.GetValues(typeof(TransactionConcurrency)))
{
foreach (TransactionIsolation isolation in Enum.GetValues(typeof(TransactionIsolation)))
{
Console.WriteLine("Test tx [concurrency=" + concurrency + ", isolation=" + isolation + "]");
Assert.IsNull(Transactions.Tx);
using (var tx = withTimeout
? Transactions.TxStart(concurrency, isolation, TimeSpan.FromMilliseconds(1100), 10)
: Transactions.TxStart(concurrency, isolation))
{
Assert.AreEqual(concurrency, tx.Concurrency);
Assert.AreEqual(isolation, tx.Isolation);
if (withTimeout)
Assert.AreEqual(1100, tx.Timeout.TotalMilliseconds);
cache.Put(1, cntr);
tx.Commit();
}
Assert.IsNull(Transactions.Tx);
Assert.AreEqual(cntr, cache.Get(1));
cntr++;
}
}
}
/// <summary>
/// Tests that transaction properties are applied and propagated properly.
/// </summary>
[Test]
public void TestTxAttributes()
{
ITransaction tx = Transactions.TxStart(TransactionConcurrency.Optimistic,
TransactionIsolation.RepeatableRead, TimeSpan.FromMilliseconds(2500), 100);
Assert.IsFalse(tx.IsRollbackOnly);
Assert.AreEqual(TransactionConcurrency.Optimistic, tx.Concurrency);
Assert.AreEqual(TransactionIsolation.RepeatableRead, tx.Isolation);
Assert.AreEqual(2500, tx.Timeout.TotalMilliseconds);
Assert.AreEqual(TransactionState.Active, tx.State);
Assert.IsTrue(tx.StartTime.Ticks > 0);
Assert.AreEqual(tx.NodeId, GetIgnite(0).GetCluster().GetLocalNode().Id);
Assert.AreEqual(Transactions.DefaultTimeoutOnPartitionMapExchange, TimeSpan.Zero);
DateTime startTime1 = tx.StartTime;
tx.Commit();
Assert.IsFalse(tx.IsRollbackOnly);
Assert.AreEqual(TransactionState.Committed, tx.State);
Assert.AreEqual(TransactionConcurrency.Optimistic, tx.Concurrency);
Assert.AreEqual(TransactionIsolation.RepeatableRead, tx.Isolation);
Assert.AreEqual(2500, tx.Timeout.TotalMilliseconds);
Assert.AreEqual(startTime1, tx.StartTime);
Thread.Sleep(100);
tx = Transactions.TxStart(TransactionConcurrency.Pessimistic, TransactionIsolation.ReadCommitted,
TimeSpan.FromMilliseconds(3500), 200);
Assert.IsFalse(tx.IsRollbackOnly);
Assert.AreEqual(TransactionConcurrency.Pessimistic, tx.Concurrency);
Assert.AreEqual(TransactionIsolation.ReadCommitted, tx.Isolation);
Assert.AreEqual(3500, tx.Timeout.TotalMilliseconds);
Assert.AreEqual(TransactionState.Active, tx.State);
Assert.IsTrue(tx.StartTime.Ticks > 0);
Assert.IsTrue(tx.StartTime > startTime1);
DateTime startTime2 = tx.StartTime;
tx.Rollback();
Assert.AreEqual(TransactionState.RolledBack, tx.State);
Assert.AreEqual(TransactionConcurrency.Pessimistic, tx.Concurrency);
Assert.AreEqual(TransactionIsolation.ReadCommitted, tx.Isolation);
Assert.AreEqual(3500, tx.Timeout.TotalMilliseconds);
Assert.AreEqual(startTime2, tx.StartTime);
Thread.Sleep(100);
tx = Transactions.TxStart(TransactionConcurrency.Optimistic, TransactionIsolation.RepeatableRead,
TimeSpan.FromMilliseconds(2500), 100);
Assert.IsFalse(tx.IsRollbackOnly);
Assert.AreEqual(TransactionConcurrency.Optimistic, tx.Concurrency);
Assert.AreEqual(TransactionIsolation.RepeatableRead, tx.Isolation);
Assert.AreEqual(2500, tx.Timeout.TotalMilliseconds);
Assert.AreEqual(TransactionState.Active, tx.State);
Assert.IsTrue(tx.StartTime > startTime2);
DateTime startTime3 = tx.StartTime;
tx.Commit();
Assert.IsFalse(tx.IsRollbackOnly);
Assert.AreEqual(TransactionState.Committed, tx.State);
Assert.AreEqual(TransactionConcurrency.Optimistic, tx.Concurrency);
Assert.AreEqual(TransactionIsolation.RepeatableRead, tx.Isolation);
Assert.AreEqual(2500, tx.Timeout.TotalMilliseconds);
Assert.AreEqual(startTime3, tx.StartTime);
// Check defaults.
tx = Transactions.TxStart();
Assert.AreEqual(Transactions.DefaultTransactionConcurrency, tx.Concurrency);
Assert.AreEqual(Transactions.DefaultTransactionIsolation, tx.Isolation);
Assert.AreEqual(Transactions.DefaultTimeout, tx.Timeout);
tx.Commit();
}
/// <summary>
/// Tests <see cref="ITransaction.IsRollbackOnly"/> flag.
/// </summary>
[Test]
public void TestTxRollbackOnly()
{
var cache = Cache();
cache.Put(1, 1);
cache.Put(2, 2);
var tx = Transactions.TxStart();
cache.Put(1, 10);
cache.Put(2, 20);
Assert.IsFalse(tx.IsRollbackOnly);
tx.SetRollbackonly();
Assert.IsTrue(tx.IsRollbackOnly);
Assert.AreEqual(TransactionState.MarkedRollback, tx.State);
Assert.Throws<TransactionRollbackException>(() => tx.Commit());
tx.Dispose();
Assert.AreEqual(TransactionState.RolledBack, tx.State);
Assert.IsTrue(tx.IsRollbackOnly);
Assert.AreEqual(1, cache.Get(1));
Assert.AreEqual(2, cache.Get(2));
Assert.IsNull(Transactions.Tx);
}
/// <summary>
/// Tests transaction metrics.
/// </summary>
[Test]
public void TestTxMetrics()
{
var cache = Cache();
var startTime = DateTime.UtcNow.AddSeconds(-1);
Transactions.ResetMetrics();
var metrics = Transactions.GetMetrics();
Assert.AreEqual(0, metrics.TxCommits);
Assert.AreEqual(0, metrics.TxRollbacks);
using (Transactions.TxStart())
{
cache.Put(1, 1);
}
using (var tx = Transactions.TxStart())
{
cache.Put(1, 1);
tx.Commit();
}
metrics = Transactions.GetMetrics();
Assert.AreEqual(1, metrics.TxCommits);
Assert.AreEqual(1, metrics.TxRollbacks);
Assert.LessOrEqual(startTime, metrics.CommitTime);
Assert.LessOrEqual(startTime, metrics.RollbackTime);
Assert.GreaterOrEqual(DateTime.UtcNow, metrics.CommitTime);
Assert.GreaterOrEqual(DateTime.UtcNow, metrics.RollbackTime);
}
/// <summary>
/// Tests transaction state transitions.
/// </summary>
[Test]
public void TestTxStateAndExceptions()
{
var tx = Transactions.TxStart();
Assert.AreEqual(TransactionState.Active, tx.State);
Assert.AreEqual(Thread.CurrentThread.ManagedThreadId, tx.ThreadId);
tx.AddMeta("myMeta", 42);
Assert.AreEqual(42, tx.Meta<int>("myMeta"));
Assert.AreEqual(42, tx.RemoveMeta<int>("myMeta"));
tx.RollbackAsync().Wait();
Assert.AreEqual(TransactionState.RolledBack, tx.State);
Assert.Throws<InvalidOperationException>(() => tx.Commit());
tx = Transactions.TxStart();
Assert.AreEqual(TransactionState.Active, tx.State);
tx.CommitAsync().Wait();
Assert.AreEqual(TransactionState.Committed, tx.State);
var task = tx.RollbackAsync(); // Illegal, but should not fail here; will fail in task
Assert.Throws<AggregateException>(() => task.Wait());
}
/// <summary>
/// Tests the transaction deadlock detection.
/// </summary>
[Test]
public void TestTxDeadlockDetection()
{
var cache = Cache();
var keys0 = Enumerable.Range(1, 100).ToArray();
cache.PutAll(keys0.ToDictionary(x => x, x => x));
var barrier = new Barrier(2);
Action<int[]> increment = keys =>
{
using (var tx = Transactions.TxStart(TransactionConcurrency.Pessimistic,
TransactionIsolation.RepeatableRead, TimeSpan.FromSeconds(0.5), 0))
{
foreach (var key in keys)
cache[key]++;
barrier.SignalAndWait(500);
tx.Commit();
}
};
// Increment keys within tx in different order to cause a deadlock.
var aex = Assert.Throws<AggregateException>(() =>
Task.WaitAll(new[]
{
TaskRunner.Run(() => increment(keys0)),
TaskRunner.Run(() => increment(keys0.Reverse().ToArray()))
},
TimeSpan.FromSeconds(40)));
Assert.AreEqual(2, aex.InnerExceptions.Count);
var deadlockEx = aex.InnerExceptions.OfType<TransactionDeadlockException>().FirstOrDefault();
if (deadlockEx != null)
{
Assert.IsTrue(deadlockEx.Message.Trim().StartsWith("Deadlock detected:"), deadlockEx.Message);
}
else
{
Assert.Fail("Unexpected exception: " + aex);
}
}
/// <summary>
/// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/>.
/// </summary>
[Test]
public void TestTransactionScopeSingleCache()
{
var cache = Cache();
cache[1] = 1;
cache[2] = 2;
// Commit.
using (var ts = new TransactionScope())
{
cache[1] = 10;
cache[2] = 20;
Assert.IsNotNull(cache.Ignite.GetTransactions().Tx);
ts.Complete();
}
Assert.AreEqual(10, cache[1]);
Assert.AreEqual(20, cache[2]);
// Rollback.
using (new TransactionScope())
{
cache[1] = 100;
cache[2] = 200;
}
Assert.AreEqual(10, cache[1]);
Assert.AreEqual(20, cache[2]);
}
/// <summary>
/// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/>
/// with multiple participating caches.
/// </summary>
[Test]
public void TestTransactionScopeMultiCache([Values(true, false)] bool async)
{
var cache1 = Cache();
var cache2 = GetIgnite(0).GetOrCreateCache<int, int>(new CacheConfiguration(cache1.Name + "_")
{
AtomicityMode = CacheAtomicityMode.Transactional
});
cache1[1] = 1;
cache2[1] = 2;
// Commit.
using (var ts = new TransactionScope())
{
if (async)
{
cache1.PutAsync(1, 10);
cache2.PutAsync(1, 20);
}
else
{
cache1.Put(1, 10);
cache2.Put(1, 20);
}
ts.Complete();
}
Assert.AreEqual(10, cache1[1]);
Assert.AreEqual(20, cache2[1]);
// Rollback.
using (new TransactionScope())
{
if (async)
{
cache1.PutAsync(1, 100);
cache2.PutAsync(1, 200);
}
else
{
cache1.Put(1, 100);
cache2.Put(1, 200);
}
}
Assert.AreEqual(10, cache1[1]);
Assert.AreEqual(20, cache2[1]);
}
/// <summary>
/// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/>
/// when Ignite tx is started manually.
/// </summary>
[Test]
public void TestTransactionScopeWithManualIgniteTx()
{
var cache = Cache();
var transactions = cache.Ignite.GetTransactions();
cache[1] = 1;
// When Ignite tx is started manually, it won't be enlisted in TransactionScope.
using (var tx = transactions.TxStart())
{
using (new TransactionScope())
{
cache[1] = 2;
} // Revert transaction scope.
tx.Commit(); // Commit manual tx.
}
Assert.AreEqual(2, cache[1]);
}
/// <summary>
/// Test Ignite transaction with <see cref="TransactionScopeOption.Suppress"/> option.
/// </summary>
[Test]
public void TestSuppressedTransactionScope()
{
var cache = Cache();
cache[1] = 1;
using (new TransactionScope(TransactionScopeOption.Suppress))
{
cache[1] = 2;
}
// Even though transaction is not completed, the value is updated, because tx is suppressed.
Assert.AreEqual(2, cache[1]);
}
/// <summary>
/// Test Ignite transaction enlistment in ambient <see cref="TransactionScope"/> with nested scopes.
/// </summary>
[Test]
public void TestNestedTransactionScope()
{
var cache = Cache();
cache[1] = 1;
foreach (var option in new[] {TransactionScopeOption.Required, TransactionScopeOption.RequiresNew})
{
// Commit.
using (var ts1 = new TransactionScope())
{
using (var ts2 = new TransactionScope(option))
{
cache[1] = 2;
ts2.Complete();
}
cache[1] = 3;
ts1.Complete();
}
Assert.AreEqual(3, cache[1]);
// Rollback.
using (new TransactionScope())
{
using (new TransactionScope(option))
cache[1] = 4;
cache[1] = 5;
}
// In case with Required option there is a single tx
// that gets aborted, second put executes outside the tx.
Assert.AreEqual(option == TransactionScopeOption.Required ? 5 : 3, cache[1], option.ToString());
}
}
/// <summary>
/// Test that ambient <see cref="TransactionScope"/> options propagate to Ignite transaction.
/// </summary>
[Test]
public void TestTransactionScopeOptions()
{
var cache = Cache();
var transactions = cache.Ignite.GetTransactions();
var modes = new[]
{
Tuple.Create(IsolationLevel.Serializable, TransactionIsolation.Serializable),
Tuple.Create(IsolationLevel.RepeatableRead, TransactionIsolation.RepeatableRead),
Tuple.Create(IsolationLevel.ReadCommitted, TransactionIsolation.ReadCommitted),
Tuple.Create(IsolationLevel.ReadUncommitted, TransactionIsolation.ReadCommitted),
Tuple.Create(IsolationLevel.Snapshot, TransactionIsolation.ReadCommitted),
Tuple.Create(IsolationLevel.Chaos, TransactionIsolation.ReadCommitted),
};
foreach (var mode in modes)
{
using (new TransactionScope(TransactionScopeOption.Required, new TransactionOptions
{
IsolationLevel = mode.Item1
}))
{
cache[1] = 1;
var tx = transactions.Tx;
Assert.AreEqual(mode.Item2, tx.Isolation);
Assert.AreEqual(transactions.DefaultTransactionConcurrency, tx.Concurrency);
}
}
}
/// <summary>
/// Tests all transactional operations with <see cref="TransactionScope"/>.
/// </summary>
[Test]
public void TestTransactionScopeAllOperations()
{
for (var i = 0; i < 10; i++)
{
CheckTxOp((cache, key) => cache.Put(key, -5));
CheckTxOp((cache, key) => cache.PutAsync(key, -5));
CheckTxOp((cache, key) => cache.PutAll(new Dictionary<int, int> {{key, -7}}));
CheckTxOp((cache, key) => cache.PutAllAsync(new Dictionary<int, int> {{key, -7}}));
CheckTxOp((cache, key) =>
{
cache.Remove(key);
cache.PutIfAbsent(key, -10);
});
CheckTxOp((cache, key) =>
{
cache.Remove(key);
cache.PutIfAbsentAsync(key, -10);
});
CheckTxOp((cache, key) => cache.GetAndPut(key, -9));
CheckTxOp((cache, key) => cache.GetAndPutAsync(key, -9));
CheckTxOp((cache, key) =>
{
cache.Remove(key);
cache.GetAndPutIfAbsent(key, -10);
});
CheckTxOp((cache, key) =>
{
cache.Remove(key);
cache.GetAndPutIfAbsentAsync(key, -10);
});
CheckTxOp((cache, key) => cache.GetAndRemove(key));
CheckTxOp((cache, key) => cache.GetAndRemoveAsync(key));
CheckTxOp((cache, key) => cache.GetAndReplace(key, -11));
CheckTxOp((cache, key) => cache.GetAndReplaceAsync(key, -11));
CheckTxOp((cache, key) => cache.Invoke(key, new AddProcessor(), 1));
CheckTxOp((cache, key) => cache.InvokeAsync(key, new AddProcessor(), 1));
CheckTxOp((cache, key) => cache.InvokeAll(new[] {key}, new AddProcessor(), 1));
CheckTxOp((cache, key) => cache.InvokeAllAsync(new[] {key}, new AddProcessor(), 1));
CheckTxOp((cache, key) => cache.Remove(key));
CheckTxOp((cache, key) => cache.RemoveAsync(key));
CheckTxOp((cache, key) => cache.RemoveAll(new[] {key}));
CheckTxOp((cache, key) => cache.RemoveAllAsync(new[] {key}));
CheckTxOp((cache, key) => cache.Replace(key, 100));
CheckTxOp((cache, key) => cache.ReplaceAsync(key, 100));
CheckTxOp((cache, key) => cache.Replace(key, cache[key], 100));
CheckTxOp((cache, key) => cache.ReplaceAsync(key, cache[key], 100));
}
}
/// <summary>
/// Tests that read operations lock keys in Serializable mode.
/// </summary>
[Test]
public void TestTransactionScopeWithSerializableIsolationLocksKeysOnRead()
{
Action<Func<ICache<int, int>, int, int>>
test = TestTransactionScopeWithSerializableIsolationLocksKeysOnRead;
test((cache, key) => cache[key]);
test((cache, key) => cache.Get(key));
test((cache, key) => cache.GetAsync(key).Result);
test((cache, key) => { int val; return cache.TryGet(key, out val) ? val : 0; });
test((cache, key) => cache.TryGetAsync(key).Result.Value);
test((cache, key) => cache.GetAll(new[] {key}).Single().Value);
test((cache, key) => cache.GetAllAsync(new[] {key}).Result.Single().Value);
}
/// <summary>
/// Tests that read operations lock keys in Serializable mode.
/// </summary>
private void TestTransactionScopeWithSerializableIsolationLocksKeysOnRead(
Func<ICache<int, int>, int, int> readOp)
{
var cache = Cache();
cache.Put(1, 1);
var options = new TransactionOptions {IsolationLevel = IsolationLevel.Serializable};
using (var scope = new TransactionScope(TransactionScopeOption.Required, options))
{
Assert.AreEqual(1, readOp(cache, 1));
Assert.IsNotNull(GetIgnite(0).GetTransactions().Tx);
var evt = new ManualResetEventSlim();
var task = Task.Factory.StartNew(() =>
{
cache.PutAsync(1, 2);
evt.Set();
});
evt.Wait();
Assert.AreEqual(1, readOp(cache, 1));
scope.Complete();
task.Wait();
}
TestUtils.WaitForTrueCondition(() => 2 == readOp(cache, 1));
}
/// <summary>
/// Checks that cache operation behaves transactionally.
/// </summary>
private void CheckTxOp(Action<ICache<int, int>, int> act)
{
var isolationLevels = new[]
{
IsolationLevel.Serializable, IsolationLevel.RepeatableRead, IsolationLevel.ReadCommitted,
IsolationLevel.ReadUncommitted, IsolationLevel.Snapshot, IsolationLevel.Chaos
};
foreach (var isolationLevel in isolationLevels)
{
var txOpts = new TransactionOptions {IsolationLevel = isolationLevel};
const TransactionScopeOption scope = TransactionScopeOption.Required;
var cache = Cache();
cache[1] = 1;
cache[2] = 2;
// Rollback.
using (new TransactionScope(scope, txOpts))
{
act(cache, 1);
Assert.IsNotNull(cache.Ignite.GetTransactions().Tx, "Transaction has not started.");
}
Assert.AreEqual(1, cache[1]);
Assert.AreEqual(2, cache[2]);
using (new TransactionScope(scope, txOpts))
{
act(cache, 1);
act(cache, 2);
}
Assert.AreEqual(1, cache[1]);
Assert.AreEqual(2, cache[2]);
// Commit.
using (var ts = new TransactionScope(scope, txOpts))
{
act(cache, 1);
ts.Complete();
}
Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1);
Assert.AreEqual(2, cache[2]);
using (var ts = new TransactionScope(scope, txOpts))
{
act(cache, 1);
act(cache, 2);
ts.Complete();
}
Assert.IsTrue(!cache.ContainsKey(1) || cache[1] != 1);
Assert.IsTrue(!cache.ContainsKey(2) || cache[2] != 2);
}
}
[Serializable]
private class AddProcessor : ICacheEntryProcessor<int, int, int, int>
{
public int Process(IMutableCacheEntry<int, int> entry, int arg)
{
entry.Value += arg;
return arg;
}
}
}
}