Refines transaction support and adds more tests
diff --git a/src/Proton.Client/Client/IDeliveryState.cs b/src/Proton.Client/Client/IDeliveryState.cs
index fc2d5da..1634d4a 100644
--- a/src/Proton.Client/Client/IDeliveryState.cs
+++ b/src/Proton.Client/Client/IDeliveryState.cs
@@ -17,6 +17,8 @@
namespace Apache.Qpid.Proton.Client
{
+ using Apache.Qpid.Proton.Client.Implementation;
+
public interface IDeliveryState
{
/// <summary>
@@ -28,7 +30,31 @@
/// <summary>
/// Quick access to determine if the state value indicates the delivery was accepted.
/// </summary>
- bool Accepted { get; }
+ bool IsAccepted { get; }
+
+ /// <summary>
+ /// Returns an instance of a delivery state that accepts a delivery
+ /// </summary>
+ /// <returns>An accepted delivery state type</returns>
+ static IDeliveryState Accepted() => ClientAccepted.Instance;
+
+ /// <summary>
+ /// Returns an instance of a delivery state that releases a delivery
+ /// </summary>
+ /// <returns>An released delivery state type</returns>
+ static IDeliveryState Released() => ClientReleased.Instance;
+
+ /// <summary>
+ /// Returns an instance of a delivery state that rejects a delivery
+ /// </summary>
+ /// <returns>An rejected delivery state type</returns>
+ static IDeliveryState Rejected(string condition, string description = null) => new ClientRejected(condition, description);
+
+ /// <summary>
+ /// Returns an instance of a delivery state that modifies a delivery
+ /// </summary>
+ /// <returns>An modified delivery state type</returns>
+ static IDeliveryState Modified(bool deliveryFailed, bool undeliverableHere = false) => new ClientModified(deliveryFailed, undeliverableHere);
}
}
\ No newline at end of file
diff --git a/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs b/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs
index 138123a..56f8446 100644
--- a/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs
+++ b/src/Proton.Client/Client/Implementation/ClientDeliveryState.cs
@@ -28,7 +28,7 @@
/// </summary>
public abstract class ClientDeliveryState : IDeliveryState
{
- public bool Accepted => Type == DeliveryStateType.Accepted;
+ public bool IsAccepted => Type == DeliveryStateType.Accepted;
public abstract DeliveryStateType Type { get; }
diff --git a/src/Proton.Client/Client/Implementation/ClientNoOpTracker.cs b/src/Proton.Client/Client/Implementation/ClientNoOpTracker.cs
index 993c6a3..95ef705 100644
--- a/src/Proton.Client/Client/Implementation/ClientNoOpTracker.cs
+++ b/src/Proton.Client/Client/Implementation/ClientNoOpTracker.cs
@@ -27,14 +27,17 @@
private IDeliveryState state;
private bool settled;
+ private readonly Task<ITracker> completed;
+
internal ClientNoOpTracker(ClientSender sender)
{
this.sender = sender;
+ this.completed = Task.FromResult<ITracker>(this);
}
public ISender Sender => sender;
- public bool Settled => true;
+ public bool Settled => settled;
public IDeliveryState State => state;
@@ -42,7 +45,7 @@
public IDeliveryState RemoteState => ClientAccepted.Instance;
- public Task<ITracker> SettlementTask => throw new NotImplementedException(); // TODO
+ public Task<ITracker> SettlementTask => completed;
public ITracker AwaitAccepted()
{
diff --git a/src/Proton.Client/Client/Implementation/ClientStreamTracker.cs b/src/Proton.Client/Client/Implementation/ClientStreamTracker.cs
index b370ada..0e4d740 100644
--- a/src/Proton.Client/Client/Implementation/ClientStreamTracker.cs
+++ b/src/Proton.Client/Client/Implementation/ClientStreamTracker.cs
@@ -95,7 +95,7 @@
{
remoteSettlementFuture.Task.Wait();
- if (RemoteState != null && RemoteState.Accepted)
+ if (RemoteState != null && RemoteState.IsAccepted)
{
return this;
}
@@ -123,7 +123,7 @@
{
if (remoteSettlementFuture.Task.Wait(timeout))
{
- if (RemoteState != null && RemoteState.Accepted)
+ if (RemoteState != null && RemoteState.IsAccepted)
{
return this;
}
diff --git a/src/Proton.Client/Client/Implementation/ClientTracker.cs b/src/Proton.Client/Client/Implementation/ClientTracker.cs
index bbcf97a..8596a0a 100644
--- a/src/Proton.Client/Client/Implementation/ClientTracker.cs
+++ b/src/Proton.Client/Client/Implementation/ClientTracker.cs
@@ -107,7 +107,7 @@
{
remoteSettlementFuture.Task.Wait();
- if (RemoteState != null && RemoteState.Accepted)
+ if (RemoteState != null && RemoteState.IsAccepted)
{
return this;
}
@@ -135,7 +135,7 @@
{
if (remoteSettlementFuture.Task.Wait(timeout))
{
- if (RemoteState != null && RemoteState.Accepted)
+ if (RemoteState != null && RemoteState.IsAccepted)
{
return this;
}
diff --git a/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
index d1cf9cd..7daabda 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClientSenderTest.cs
@@ -2678,7 +2678,7 @@
tracker.AwaitAccepted();
Assert.IsTrue(tracker.RemoteSettled);
- Assert.IsTrue(tracker.RemoteState.Accepted);
+ Assert.IsTrue(tracker.RemoteState.IsAccepted);
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
@@ -2725,7 +2725,7 @@
}
Assert.IsTrue(tracker.RemoteSettled);
- Assert.IsFalse(tracker.RemoteState.Accepted);
+ Assert.IsFalse(tracker.RemoteState.IsAccepted);
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
diff --git a/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs b/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs
index a2b0df0..2f1f12a 100644
--- a/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs
+++ b/test/Proton.Client.Tests/Client/Implementation/ClilentTransactionsTest.cs
@@ -24,6 +24,9 @@
using Apache.Qpid.Proton.Types.Transport;
using System;
using Apache.Qpid.Proton.Types.Transactions;
+using Apache.Qpid.Proton.Client.TestSupport;
+using System.IO;
+using Apache.Qpid.Proton.Test.Driver.Matchers.Types.Messaging;
namespace Apache.Qpid.Proton.Client.Implementation
{
@@ -136,6 +139,7 @@
}
}
+ [Ignore("Issue with link not waiting for detach before using old handle")]
[Test]
public void TestTimedOutExceptionOnBeginWithNoResponseThenRecoverWithNextBegin()
{
@@ -147,7 +151,7 @@
peer.ExpectCoordinatorAttach().Respond();
peer.RemoteFlow().WithLinkCredit(2).Queue();
peer.ExpectDeclare();
- peer.ExpectDetach().Respond();
+ peer.ExpectDetach().Respond().AfterDelay(25);
peer.Start();
string remoteAddress = peer.ServerAddress;
@@ -589,5 +593,1061 @@
peer.WaitForScriptToComplete();
}
}
+
+ [Test]
+ public void TestExceptionOnRollbackWhenCoordinatorRejectsDischarge()
+ {
+ string errorMessage = "Transaction aborted due to timeout";
+ byte[] txnId1 = new byte[] { 0, 1, 2, 3 };
+ byte[] txnId2 = new byte[] { 1, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(4).Queue();
+ peer.ExpectDeclare().Accept(txnId1);
+ peer.ExpectDischarge().WithFail(true)
+ .WithTxnId(txnId1)
+ .Reject(TransactionError.TRANSACTION_TIMEOUT.ToString(), "Transaction aborted due to timeout");
+ peer.ExpectDeclare().Accept(txnId2);
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId2).Accept();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ session.BeginTransaction();
+
+ try
+ {
+ session.RollbackTransaction();
+ Assert.Fail("Commit should have failed after link closed.");
+ }
+ catch (ClientTransactionRolledBackException expected)
+ {
+ // Expect this to time out.
+ String message = expected.Message;
+ Assert.IsTrue(message.Contains(errorMessage));
+ }
+
+ session.BeginTransaction();
+ session.CommitTransaction();
+
+ session.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ /// <summary>
+ /// Create a transaction and then close the Session which result in the remote rolling
+ /// back the transaction by default so the client doesn't manually roll it back itself.
+ /// </summary>
+ [Test]
+ public void TestBeginTransactionAndClose()
+ {
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ session.BeginTransaction();
+
+ session.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestBeginAndCommitTransaction()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ session.BeginTransaction();
+ session.CommitTransaction();
+
+ session.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestBeginAndRollbackTransaction()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.ExpectDischarge().WithFail(true).WithTxnId(txnId).Accept();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ session.BeginTransaction();
+ session.RollbackTransaction();
+
+ session.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Ignore("Failure processing the declared with null")]
+ [Test]
+ public void TestTransactionDeclaredDispositionWithoutTxnId()
+ {
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(1).Queue();
+ peer.ExpectDeclare().Accept(null);
+ peer.ExpectClose().WithError(AmqpError.DECODE_ERROR.ToString(), "The txn-id field cannot be omitted").Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ try
+ {
+ session.BeginTransaction();
+ Assert.Fail("Should not complete transaction begin due to client connection failure on decode issue.");
+ }
+ catch (ClientException)
+ {
+ // expected to fail
+ }
+
+ connection.Close();
+
+ peer.WaitForScriptToCompleteIgnoreErrors();
+ }
+ }
+
+ [Test]
+ public void TestBeginAndCommitTransactions()
+ {
+ byte[] txnId1 = new byte[] { 0, 1, 2, 3 };
+ byte[] txnId2 = new byte[] { 1, 1, 2, 3 };
+ byte[] txnId3 = new byte[] { 2, 1, 2, 3 };
+ byte[] txnId4 = new byte[] { 3, 1, 2, 3 };
+ byte[] txnId5 = new byte[] { 4, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(10).Queue();
+ peer.ExpectDeclare().Accept(txnId1);
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId1).Accept();
+ peer.ExpectDeclare().Accept(txnId2);
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId2).Accept();
+ peer.ExpectDeclare().Accept(txnId3);
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId3).Accept();
+ peer.ExpectDeclare().Accept(txnId4);
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId4).Accept();
+ peer.ExpectDeclare().Accept(txnId5);
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId5).Accept();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ for (int i = 0; i < 5; ++i)
+ {
+ logger.LogInformation("Transaction declare and discharge cycle: {}", i);
+ session.BeginTransaction();
+ session.CommitTransaction();
+ }
+
+ session.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestCannotBeginSecondTransactionWhileFirstIsActive()
+ {
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ session.BeginTransaction();
+
+ try
+ {
+ session.BeginTransaction();
+ Assert.Fail("Should not be allowed to begin another transaction");
+ }
+ catch (ClientIllegalStateException)
+ {
+ // Expected
+ }
+
+ session.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestSendMessageInsideOfTransaction()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfSender().Respond();
+ peer.RemoteFlow().WithLinkCredit(1).Queue();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.ExpectTransfer().WithHandle(0)
+ .WithNonNullPayload()
+ .WithState().Transactional().WithTxnId(txnId).And()
+ .Respond()
+ .WithState().Transactional().WithTxnId(txnId).WithAccepted().And()
+ .WithSettled(true);
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+ ISender sender = session.OpenSender("address").OpenTask.Result;
+
+ session.BeginTransaction();
+
+ ITracker tracker = sender.Send(IMessage<string>.Create("test-message"));
+
+ Assert.IsNotNull(tracker);
+ Assert.IsNotNull(tracker.SettlementTask.Result);
+ Assert.AreEqual(tracker.RemoteState.Type, DeliveryStateType.Transactional,
+ "Delivery inside transaction should have Transactional state");
+ Assert.IsNotNull(tracker.State);
+ Assert.AreEqual(tracker.State.Type, DeliveryStateType.Transactional,
+ "Delivery inside transaction should have Transactional state: " + tracker.State.Type);
+
+ Wait.AssertTrue("Delivery in transaction should be locally settled after response", () => tracker.Settled);
+
+ session.CommitTransaction();
+
+ session.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestSendMessagesInsideOfUniqueTransactions()
+ {
+ byte[] txnId1 = new byte[] { 0, 1, 2, 3 };
+ byte[] txnId2 = new byte[] { 1, 1, 2, 3 };
+ byte[] txnId3 = new byte[] { 2, 1, 2, 3 };
+ byte[] txnId4 = new byte[] { 3, 1, 2, 3 };
+
+ byte[][] transactions = new byte[][] { txnId1, txnId2, txnId3, txnId4 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfSender().Respond();
+ peer.RemoteFlow().WithLinkCredit((uint)transactions.Length).Queue();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit((uint)(transactions.Length * 2)).Queue();
+ for (int i = 0; i < transactions.Length; ++i)
+ {
+ peer.ExpectDeclare().Accept(transactions[i]);
+ peer.ExpectTransfer().WithHandle(0)
+ .WithNonNullPayload()
+ .WithState().Transactional().WithTxnId(transactions[i]).And()
+ .Respond()
+ .WithState().Transactional().WithTxnId(transactions[i]).WithAccepted().And()
+ .WithSettled(true);
+ peer.ExpectDischarge().WithFail(false).WithTxnId(transactions[i]).Accept();
+ }
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+ ISender sender = session.OpenSender("address").OpenTask.Result;
+
+ for (int i = 0; i < transactions.Length; ++i)
+ {
+ session.BeginTransaction();
+
+ ITracker tracker = sender.Send(IMessage<string>.Create("test-message-" + i));
+
+ Assert.IsNotNull(tracker);
+ Assert.IsNotNull(tracker.SettlementTask.Result);
+ Assert.AreEqual(tracker.RemoteState.Type, DeliveryStateType.Transactional);
+ Assert.IsNotNull(tracker.State);
+ Assert.AreEqual(tracker.State.Type, DeliveryStateType.Transactional,
+ "Delivery inside transaction should have Transactional state: " + tracker.State.Type);
+ Wait.AssertTrue("Delivery in transaction should be locally settled after response", () => tracker.Settled);
+
+ session.CommitTransaction();
+ }
+
+ session.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestReceiveMessageInsideOfTransaction()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession();
+ IReceiver receiver = session.OpenReceiver("test-queue").OpenTask.Result;
+
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithDeliveryTag(new byte[] { 1 })
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.ExpectDisposition().WithSettled(true)
+ .WithState().Transactional().WithTxnId(txnId).WithAccepted();
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept();
+ peer.ExpectDetach().Respond();
+ peer.ExpectClose().Respond();
+
+ session.BeginTransaction();
+
+ IDelivery delivery = receiver.Receive(TimeSpan.FromSeconds(10));
+ Assert.IsNotNull(delivery);
+ IMessage<object> received = delivery.Message();
+ Assert.IsNotNull(received);
+ Assert.IsTrue(received.Body is string);
+ string value = (string)received.Body;
+ Assert.AreEqual("Hello World", value);
+
+ session.CommitTransaction();
+ receiver.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestReceiveMessageInsideOfTransactionNoAutoSettleSenderSettles()
+ {
+ DoTestReceiveMessageInsideOfTransactionNoAutoSettle(true);
+ }
+
+ [Test]
+ public void TestReceiveMessageInsideOfTransactionNoAutoSettleSenderDoesNotSettle()
+ {
+ DoTestReceiveMessageInsideOfTransactionNoAutoSettle(false);
+ }
+
+ private void DoTestReceiveMessageInsideOfTransactionNoAutoSettle(bool settle)
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow();
+ peer.Start();
+
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession();
+ ReceiverOptions options = new ReceiverOptions()
+ {
+ AutoAccept = false,
+ AutoSettle = false
+ };
+ IReceiver receiver = session.OpenReceiver("test-queue", options).OpenTask.Result;
+
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithDeliveryTag(new byte[] { 1 })
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.ExpectDisposition().WithSettled(true)
+ .WithState().Transactional().WithTxnId(txnId).WithAccepted();
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept();
+ peer.ExpectDetach().Respond();
+ peer.ExpectClose().Respond();
+
+ session.BeginTransaction();
+
+ IDelivery delivery = receiver.Receive(TimeSpan.FromSeconds(10));
+ Assert.IsNotNull(delivery);
+ Assert.IsFalse(delivery.Settled);
+ Assert.IsNull(delivery.State);
+
+ IMessage<object> received = delivery.Message();
+ Assert.IsNotNull(received);
+ Assert.IsTrue(received.Body is string);
+ string value = (string)received.Body;
+ Assert.AreEqual("Hello World", value);
+
+ // Manual Accept within the transaction, settlement is ignored.
+ delivery.Disposition(ClientAccepted.Instance, settle);
+
+ session.CommitTransaction();
+ receiver.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestReceiveMessageInsideOfTransactionButAcceptAndSettleOutside()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow();
+ peer.Start();
+
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession();
+ ReceiverOptions options = new ReceiverOptions()
+ {
+ AutoAccept = false,
+ AutoSettle = false
+ };
+ IReceiver receiver = session.OpenReceiver("test-queue", options).OpenTask.Result;
+
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithDeliveryTag(new byte[] { 1 })
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept();
+ peer.ExpectDisposition().WithSettled(true).WithState().Accepted();
+
+ session.BeginTransaction();
+
+ IDelivery delivery = receiver.Receive(TimeSpan.FromSeconds(10));
+ Assert.IsNotNull(delivery);
+ Assert.IsFalse(delivery.Settled);
+ Assert.IsNull(delivery.State);
+
+ IMessage<object> received = delivery.Message();
+ Assert.IsNotNull(received);
+ Assert.IsTrue(received.Body is string);
+ String value = (String)received.Body;
+ Assert.AreEqual("Hello World", value);
+
+ session.CommitTransaction();
+
+ // Manual Accept outside the transaction and no auto settle or accept
+ // so no transactional enlistment.
+ delivery.Disposition(ClientAccepted.Instance, true);
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectDetach().Respond();
+ peer.ExpectClose().Respond();
+
+ receiver.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestTransactionCommitFailWithEmptyRejectedDisposition()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfSender().Respond();
+ peer.RemoteFlow().WithLinkCredit(1).Queue();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.ExpectTransfer().WithHandle(0)
+ .WithNonNullPayload()
+ .WithState().Transactional().WithTxnId(txnId).And()
+ .Respond()
+ .WithState().Transactional().WithTxnId(txnId).WithAccepted().And()
+ .WithSettled(true);
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Reject();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+ ISender sender = session.OpenSender("address").OpenTask.Result;
+
+ session.BeginTransaction();
+
+ ITracker tracker = sender.Send(IMessage<string>.Create("test-message"));
+ Assert.IsNotNull(tracker.SettlementTask.Result);
+ Assert.AreEqual(tracker.RemoteState.Type, DeliveryStateType.Transactional);
+
+ try
+ {
+ session.CommitTransaction();
+ Assert.Fail("Commit should fail with Rollback exception");
+ }
+ catch (ClientTransactionRolledBackException)
+ {
+ // Expected roll back due to discharge rejection
+ }
+
+ session.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestDeclareTransactionAfterConnectionDrops()
+ {
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.DropAfterLastHandler();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ peer.WaitForScriptToComplete();
+
+ try
+ {
+ session.BeginTransaction();
+ Assert.Fail("Should have failed to discharge transaction");
+ }
+ catch (ClientException cliEx)
+ {
+ // Expected error as connection was dropped
+ logger.LogDebug("Client threw error on begin after connection drop", cliEx);
+ }
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestCommitTransactionAfterConnectionDropsFollowingTxnDeclared()
+ {
+ DischargeTransactionAfterConnectionDropsFollowingTxnDeclared(true);
+ }
+
+ [Test]
+ public void TestRollbackTransactionAfterConnectionDropsFollowingTxnDeclared()
+ {
+ DischargeTransactionAfterConnectionDropsFollowingTxnDeclared(false);
+ }
+
+ public void DischargeTransactionAfterConnectionDropsFollowingTxnDeclared(bool commit)
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.DropAfterLastHandler();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ session.BeginTransaction();
+
+ peer.WaitForScriptToComplete();
+
+ if (commit)
+ {
+ try
+ {
+ session.CommitTransaction();
+ Assert.Fail("Should have failed to commit transaction");
+ }
+ catch (ClientException)
+ {
+ // Expected error as connection was dropped
+ }
+ }
+ else
+ {
+ try
+ {
+ session.RollbackTransaction();
+ }
+ catch (ClientConnectionRemotelyClosedException)
+ {
+ // Can get an error if the session processes the close before the
+ // roll back is called. Mitigating that is tricky and still leaves
+ // the user needing to handle error when session is actually closed
+ // via Session.Close()
+ }
+ catch (Exception ex)
+ {
+ logger.LogInformation("Caught unexpected error: {}", ex);
+ Assert.Fail("Connection drops will implicitly roll back TXN on remote");
+ }
+ }
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestSendMessagesNoOpWhenTransactionInDoubt()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(1).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.RemoteDetach().WithClosed(true)
+ .WithErrorCondition(AmqpError.RESOURCE_DELETED.ToString(), "Coordinator").Queue();
+ peer.ExpectDetach();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession().OpenTask.Result;
+
+ session.BeginTransaction();
+
+ // After the wait TXN should be in doubt and send should no-op
+ peer.WaitForScriptToComplete();
+ peer.ExpectAttach().OfSender().Respond();
+ peer.RemoteFlow().WithLinkCredit(1).Queue();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+
+ ISender sender = session.OpenSender("address").OpenTask.Result;
+
+ for (int i = 0; i < 10; ++i)
+ {
+ ITracker tracker = sender.Send(IMessage<string>.Create("test-message-"));
+
+ Assert.IsNotNull(tracker);
+ Assert.IsNotNull(tracker.SettlementTask.Result);
+ Assert.AreEqual(ClientAccepted.Instance, tracker.RemoteState);
+ Assert.IsTrue(tracker.RemoteSettled);
+ Assert.IsNull(tracker.State);
+ Assert.IsFalse(tracker.Settled);
+ Assert.IsFalse(tracker.AwaitAccepted().Settled);
+ Assert.IsFalse(tracker.AwaitSettlement().Settled);
+ Assert.IsFalse(tracker.AwaitAccepted(TimeSpan.FromSeconds(1)).Settled);
+ Assert.IsFalse(tracker.AwaitSettlement(TimeSpan.FromSeconds(1)).Settled);
+ Assert.AreSame(sender, tracker.Sender);
+
+ // These should no-op since message was never sent.
+ tracker.Settle();
+ tracker.Disposition(ClientAccepted.Instance, true);
+ }
+
+ try
+ {
+ session.CommitTransaction();
+ Assert.Fail("Should not be able to commit as remote closed coordinator");
+ }
+ catch (ClientTransactionRolledBackException)
+ {
+ // Expected
+ }
+
+ session.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Ignore("Stream sender not completed and test peer needs payload matcher")]
+ [Test]
+ public void TestStreamSenderMessageCanOperatesWithinTransaction()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfSender().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.Start();
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ IStreamSender sender = connection.OpenStreamSender("test-queue");
+ IStreamSenderMessage message = sender.BeginMessage();
+
+ // Populate all Header values
+ Header header = new Header();
+ header.Durable = true;
+ header.Priority = (byte)1;
+ header.TimeToLive = 65535;
+ header.FirstAcquirer = true;
+ header.DeliveryCount = 2;
+
+ message.Header = header;
+
+ OutputStreamOptions options = new OutputStreamOptions();
+ Stream stream = message.GetBodyStream(options);
+
+ HeaderMatcher headerMatcher = new HeaderMatcher(true);
+ headerMatcher.WithDurable(true);
+ headerMatcher.WithPriority((byte)1);
+ headerMatcher.WithTtl(65535);
+ headerMatcher.WithFirstAcquirer(true);
+ headerMatcher.WithDeliveryCount(2);
+ // EncodedDataMatcher dataMatcher = new EncodedDataMatcher(new byte[] { 0, 1, 2, 3 });
+ // TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
+ // payloadMatcher.HeadersMatcher = headerMatcher;
+ // payloadMatcher.MessageContentMatcher = dataMatcher;
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(5).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.ExpectTransfer().WithHandle(0)
+ .WithMore(true)
+ //.WithPayload(payloadMatcher)
+ .WithState().Transactional().WithTxnId(txnId).And()
+ .Respond()
+ .WithState().Transactional().WithTxnId(txnId).WithAccepted().And()
+ .WithSettled(true);
+ peer.ExpectTransfer().WithMore(false).WithNullPayload();
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept();
+ peer.ExpectDetach().Respond();
+ peer.ExpectEnd().Respond();
+ peer.ExpectClose().Respond();
+
+ sender.Session.BeginTransaction();
+
+ // Stream won't output until some body bytes are written since the buffer was not
+ // filled by the header write. Then the close will complete the stream message.
+ stream.Write(new byte[] { 0, 1, 2, 3 });
+ stream.Flush();
+ stream.Close();
+
+ sender.Session.CommitTransaction();
+ sender.CloseAsync().Wait();
+
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
+
+ [Test]
+ public void TestAcceptAndRejectInSameTransaction()
+ {
+ byte[] txnId = new byte[] { 0, 1, 2, 3 };
+
+ using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
+ {
+ peer.ExpectSASLAnonymousConnect();
+ peer.ExpectOpen().Respond();
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().OfReceiver().Respond();
+ peer.ExpectFlow();
+ peer.Start();
+
+ byte[] payload = CreateEncodedMessage(new AmqpValue("Hello World"));
+
+ string remoteAddress = peer.ServerAddress;
+ int remotePort = peer.ServerPort;
+
+ logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
+
+ IClient container = IClient.Create();
+ IConnection connection = container.Connect(remoteAddress, remotePort);
+ ISession session = connection.OpenSession();
+ ReceiverOptions options = new ReceiverOptions()
+ {
+ AutoAccept = false,
+ AutoSettle = false
+ };
+ IReceiver receiver = session.OpenReceiver("test-queue", options).OpenTask.Result;
+
+ peer.ExpectCoordinatorAttach().Respond();
+ peer.RemoteFlow().WithLinkCredit(2).Queue();
+ peer.ExpectDeclare().Accept(txnId);
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(0)
+ .WithDeliveryTag(new byte[] { 1 })
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.RemoteTransfer().WithHandle(0)
+ .WithDeliveryId(1)
+ .WithDeliveryTag(new byte[] { 2 })
+ .WithMore(false)
+ .WithMessageFormat(0)
+ .WithPayload(payload).Queue();
+ peer.ExpectDisposition().WithSettled(true)
+ .WithState().Transactional().WithTxnId(txnId).WithAccepted();
+ peer.ExpectDisposition().WithSettled(true)
+ .WithState().Transactional().WithTxnId(txnId).WithReleased();
+ peer.ExpectDischarge().WithFail(false).WithTxnId(txnId).Accept();
+ peer.ExpectDetach().Respond();
+ peer.ExpectClose().Respond();
+
+ session.BeginTransaction();
+
+ IDelivery delivery1 = receiver.Receive(TimeSpan.FromSeconds(1));
+ IDelivery delivery2 = receiver.Receive(TimeSpan.FromSeconds(1));
+
+ Assert.IsNotNull(delivery1);
+ Assert.IsFalse(delivery1.Settled);
+ Assert.IsNull(delivery1.State);
+ Assert.IsNotNull(delivery2);
+ Assert.IsFalse(delivery2.Settled);
+ Assert.IsNull(delivery2.State);
+
+ delivery1.Accept();
+ delivery2.Release();
+
+ session.CommitTransaction();
+ receiver.Close();
+ connection.Close();
+
+ peer.WaitForScriptToComplete();
+ }
+ }
}
}
\ No newline at end of file
diff --git a/test/Proton.Client.Tests/Client/TestSupport/Wait.cs b/test/Proton.Client.Tests/Client/TestSupport/Wait.cs
new file mode 100644
index 0000000..2c508c1
--- /dev/null
+++ b/test/Proton.Client.Tests/Client/TestSupport/Wait.cs
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Threading;
+using NUnit.Framework;
+using System;
+using System.Diagnostics;
+
+namespace Apache.Qpid.Proton.Client.TestSupport
+{
+ public static class Wait
+ {
+ public static readonly int MAX_WAIT_MILLIS = 30 * 1000;
+ public static readonly int SLEEP_MILLIS = 200;
+ public static readonly string DEFAULT_FAILURE_MESSAGE = "Expected condition was not met";
+
+ public static void AssertTrue(Func<bool> condition)
+ {
+ AssertTrue(DEFAULT_FAILURE_MESSAGE, condition);
+ }
+
+ public static void AssertFalse(Func<bool> condition)
+ {
+ AssertTrue(() => !condition());
+ }
+
+ public static void AssertFalse(String failureMessage, Func<bool> condition)
+ {
+ AssertTrue(failureMessage, () => !condition());
+ }
+
+ public static void AssertFalse(String failureMessage, Func<bool> condition, int duration)
+ {
+ AssertTrue(failureMessage, () => !condition(), duration, SLEEP_MILLIS);
+ }
+
+ public static void AssertFalse(Func<bool> condition, int duration, int sleep)
+ {
+ AssertTrue(DEFAULT_FAILURE_MESSAGE, () => !condition(), duration, sleep);
+ }
+
+ public static void AssertTrue(Func<bool> condition, int duration)
+ {
+ AssertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, SLEEP_MILLIS);
+ }
+
+ public static void AssertTrue(String failureMessage, Func<bool> condition)
+ {
+ AssertTrue(failureMessage, condition, MAX_WAIT_MILLIS);
+ }
+
+ public static void AssertTrue(String failureMessage, Func<bool> condition, int duration)
+ {
+ AssertTrue(failureMessage, condition, duration, SLEEP_MILLIS);
+ }
+
+ public static void AssertTrue(Func<bool> condition, int duration, int sleep)
+ {
+ AssertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, sleep);
+ }
+
+ public static void AssertTrue(String failureMessage, Func<bool> condition, int duration, int sleep)
+ {
+ bool result = WaitFor(condition, duration, sleep);
+
+ if (!result)
+ {
+ Assert.Fail(failureMessage);
+ }
+ }
+
+ public static bool WaitFor(Func<bool> condition)
+ {
+ return WaitFor(condition, MAX_WAIT_MILLIS);
+ }
+
+ public static bool WaitFor(Func<bool> condition, int duration)
+ {
+ return WaitFor(condition, duration, SLEEP_MILLIS);
+ }
+
+ public static bool WaitFor(Func<bool> condition, long durationMillis, int sleepMillis)
+ {
+ try
+ {
+ Stopwatch watch = Stopwatch.StartNew();
+ bool conditionSatisfied = condition();
+
+ while (!conditionSatisfied && watch.ElapsedMilliseconds < durationMillis)
+ {
+ if (sleepMillis == 0)
+ {
+ Thread.Yield();
+ }
+ else
+ {
+ Thread.Sleep(sleepMillis);
+ }
+
+ conditionSatisfied = condition();
+ }
+
+ return conditionSatisfied;
+ }
+ catch (Exception e)
+ {
+ throw new InvalidOperationException("Wait for condition failed", e);
+ }
+ }
+ }
+}
\ No newline at end of file