blob: 08f1b50043ecbc7c8a076a3765510b02367305bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Threading;
using Apache.Qpid.Proton.Test.Driver;
using NUnit.Framework;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using Apache.Qpid.Proton.Test.Driver.Matchers.Types.Messaging;
using Apache.Qpid.Proton.Test.Driver.Matchers.Types.Transport;
using Apache.Qpid.Proton.Types.Messaging;
using Apache.Qpid.Proton.Client.Exceptions;
using System;
using System.Linq;
using System.IO;
using System.Threading.Tasks;
using System.Text;
using Apache.Qpid.Proton.Engine;
using Apache.Qpid.Proton.Types;
namespace Apache.Qpid.Proton.Client.Implementation
{
[TestFixture, Timeout(20000)]
public class ClientStreamSenderTest : ClientBaseTestFixture
{
[Test]
public void TestSendWhenCreditIsAvailable()
{
DoTestSendWhenCreditIsAvailable(false, false);
}
[Test]
public void TestTrySendWhenCreditIsAvailable()
{
DoTestSendWhenCreditIsAvailable(true, false);
}
[Test]
public void TestSendWhenCreditIsAvailableWithDeliveryAnnotations()
{
DoTestSendWhenCreditIsAvailable(false, true);
}
[Test]
public void TestTrySendWhenCreditIsAvailableWithDeliveryAnnotations()
{
DoTestSendWhenCreditIsAvailable(true, true);
}
private void DoTestSendWhenCreditIsAvailable(bool trySend, bool addDeliveryAnnotations)
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithDeliveryCount(0)
.WithLinkCredit(10)
.WithIncomingWindow(1024)
.WithOutgoingWindow(10)
.WithNextIncomingId(0)
.WithNextOutgoingId(1).Queue();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfReceiver().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
sender.OpenTask.Wait();
// This ensures that the flow to sender is processed before we try-send
IReceiver receiver = connection.OpenReceiver("test-queue", new ReceiverOptions()
{
CreditWindow = 0
});
receiver.OpenTask.Wait();
IDictionary<string, object> deliveryAnnotations = new Dictionary<string, object>();
deliveryAnnotations.Add("da1", 1);
deliveryAnnotations.Add("da2", 2);
deliveryAnnotations.Add("da3", 3);
DeliveryAnnotationsMatcher daMatcher = new DeliveryAnnotationsMatcher(true);
daMatcher.WithEntry("da1", Test.Driver.Matchers.Is.EqualTo(1));
daMatcher.WithEntry("da2", Test.Driver.Matchers.Is.EqualTo(2));
daMatcher.WithEntry("da3", Test.Driver.Matchers.Is.EqualTo(3));
AmqpValueMatcher bodyMatcher = new AmqpValueMatcher("Hello World");
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
if (addDeliveryAnnotations)
{
payloadMatcher.DeliveryAnnotationsMatcher = daMatcher;
}
payloadMatcher.MessageContentMatcher = bodyMatcher;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithNonNullPayload();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
IMessage<string> message = IMessage<string>.Create("Hello World");
IStreamTracker tracker;
if (trySend)
{
if (addDeliveryAnnotations)
{
tracker = sender.TrySend(message, deliveryAnnotations);
}
else
{
tracker = sender.TrySend(message);
}
}
else
{
if (addDeliveryAnnotations)
{
tracker = sender.Send(message, deliveryAnnotations);
}
else
{
tracker = sender.Send(message);
}
}
Assert.IsNotNull(tracker);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestOpenStreamSenderWithLinCapabilities()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender()
.WithTarget().WithAddress("test-queue").WithCapabilities("queue").And()
.WithSource().WithAddress("test-queue").And()
.Respond();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
StreamSenderOptions senderOptions = new StreamSenderOptions();
senderOptions.TargetOptions.Capabilities = new string[] { "queue" };
IStreamSender sender = connection.OpenStreamSender("test-queue", senderOptions);
sender.OpenTask.Wait();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestOpenStreamSenderAppliesDefaultSessionOutgoingWindow()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender()
.WithTarget().WithCapabilities("queue").And()
.Respond();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
StreamSenderOptions senderOptions = new StreamSenderOptions();
senderOptions.TargetOptions.Capabilities = new string[] { "queue" };
ClientStreamSender sender = (ClientStreamSender)connection.OpenStreamSender("test-queue", senderOptions);
Assert.AreEqual(StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE,
sender.ProtonSender.Session.OutgoingCapacity);
sender.OpenTask.Wait();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestOpenStreamSenderAppliesConfiguredSessionOutgoingWindow()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender()
.WithTarget().WithCapabilities("queue").And()
.Respond();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
peer.Start();
uint PENDING_WRITES_BUFFER_SIZE = StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE / 2;
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
StreamSenderOptions senderOptions = new StreamSenderOptions()
{
PendingWriteBufferSize = PENDING_WRITES_BUFFER_SIZE
};
senderOptions.TargetOptions.Capabilities = new string[] { "queue" };
ClientStreamSender sender = (ClientStreamSender)connection.OpenStreamSender("test-queue", senderOptions);
Assert.AreEqual(PENDING_WRITES_BUFFER_SIZE, sender.ProtonSender.Session.OutgoingCapacity);
sender.OpenTask.Wait();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestSendCustomMessageWithMultipleAmqpValueSections()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectBegin().Respond(); // Hidden session for stream sender
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(10).Queue();
peer.ExpectAttach().Respond(); // Open a receiver to ensure sender link has processed
peer.ExpectFlow(); // the inbound flow frame we sent previously before send.
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
ISession session = connection.OpenSession().OpenTask.Result;
StreamSenderOptions options = new StreamSenderOptions();
options.DeliveryMode = DeliveryMode.AtMostOnce;
options.WriteBufferSize = uint.MaxValue;
IStreamSender sender = connection.OpenStreamSender("test-qos", options);
// Create a custom message format send context and ensure that no early buffer writes take place
IStreamSenderMessage message = sender.BeginMessage();
Assert.AreEqual(sender, message.Sender);
Assert.IsNull(message.Tracker);
Assert.AreEqual(Header.DEFAULT_PRIORITY, message.Priority);
Assert.AreEqual(Header.DEFAULT_DELIVERY_COUNT, message.DeliveryCount);
Assert.AreEqual(Header.DEFAULT_FIRST_ACQUIRER, message.FirstAcquirer);
Assert.AreEqual(Header.DEFAULT_TIME_TO_LIVE, message.TimeToLive);
Assert.AreEqual(Header.DEFAULT_DURABILITY, message.Durable);
message.MessageFormat = 17;
// Gates send on remote flow having been sent and received
session.OpenReceiver("dummy").OpenTask.Wait();
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
// Note: This is a specification violation but could be used by other message formats
// and we don't attempt to enforce at the Send Context what users write
AmqpValueMatcher bodyMatcher1 = new AmqpValueMatcher("one", true);
AmqpValueMatcher bodyMatcher2 = new AmqpValueMatcher("two", true);
AmqpValueMatcher bodyMatcher3 = new AmqpValueMatcher("three", false);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
payloadMatcher.AddMessageContentMatcher(bodyMatcher1);
payloadMatcher.AddMessageContentMatcher(bodyMatcher2);
payloadMatcher.AddMessageContentMatcher(bodyMatcher3);
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithMore(false).WithMessageFormat(17).WithPayload(payloadMatcher).Accept();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.TimeToLive = 65535;
header.FirstAcquirer = true;
header.DeliveryCount = 2;
message.Header = header;
message.AddBodySection(new AmqpValue("one"));
message.AddBodySection(new AmqpValue("two"));
message.AddBodySection(new AmqpValue("three"));
message.Complete();
Assert.IsNotNull(message.Tracker);
Assert.AreEqual(17, message.MessageFormat);
Assert.IsTrue(message.Tracker.SettlementTask.IsCompleted);
Assert.IsTrue(message.Tracker.SettlementTask.Result.Settled);
Assert.Throws<ClientIllegalStateException>(() => message.AddBodySection(new AmqpValue("three")));
Assert.Throws<ClientIllegalStateException>(() => _ = message.Body);
Assert.Throws<ClientIllegalStateException>(() => message.RawOutputStream());
Assert.Throws<ClientIllegalStateException>(() => message.Abort());
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestClearBodySectionsIsNoOpForStreamSenderMessage()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectBegin().Respond(); // Hidden session for stream sender
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(10).Queue();
peer.ExpectAttach().Respond(); // Open a receiver to ensure sender link has processed
peer.ExpectFlow(); // the inbound flow frame we sent previously before send.
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
ISession session = connection.OpenSession().OpenTask.Result;
StreamSenderOptions options = new StreamSenderOptions();
options.DeliveryMode = DeliveryMode.AtMostOnce;
options.WriteBufferSize = uint.MaxValue;
IStreamSender sender = connection.OpenStreamSender("test-qos", options);
// Create a custom message format send context and ensure that no early buffer writes take place
IStreamSenderMessage message = sender.BeginMessage();
message.MessageFormat = 17;
// Gates send on remote flow having been sent and received
session.OpenReceiver("dummy").OpenTask.Wait();
AmqpValueMatcher bodyMatcher1 = new AmqpValueMatcher("one", true);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.AddMessageContentMatcher(bodyMatcher1);
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithMore(false).WithMessageFormat(17).WithPayload(payloadMatcher).Accept();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
message.AddBodySection(new AmqpValue("one"));
message.ClearBodySections();
message.ForEachBodySection((section) =>
{
// No sections retained so this should never run.
throw new InvalidOperationException();
});
Assert.IsNotNull(message.GetBodySections());
Assert.IsTrue(message.GetBodySections().Count() == 0);
message.Complete();
Assert.IsTrue(message.Tracker.SettlementTask.IsCompleted);
Assert.IsTrue(message.Tracker.SettlementTask.Result.Settled);
Assert.Throws<ClientIllegalStateException>(() => _ = message.Body);
Assert.Throws<ClientIllegalStateException>(() => message.RawOutputStream());
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestMessageFormatCannotBeModifiedAfterBodyWritesStart()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond(); // Hidden session for stream sender
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(10).Queue();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
IStreamSender sender = connection.OpenStreamSender("test-qos");
IStreamSenderMessage message = sender.BeginMessage();
sender.OpenTask.Wait();
message.Durable = true;
message.MessageFormat = 17;
_ = message.Body; // Alters message state to exclude future message format changes
try
{
message.MessageFormat = 16;
Assert.Fail("Should not be able to modify message format after body writes started");
}
catch (ClientIllegalStateException)
{
// Expected
}
catch (Exception unexpected)
{
Assert.Fail("Failed test due to message format set throwing unexpected error: " + unexpected);
}
message.Abort();
Assert.Throws<ClientIllegalStateException>(() => message.Complete());
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestCannotCreateNewStreamingMessageWhileCurrentInstanceIsIncomplete()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond(); // Hidden session for stream sender
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(10).Queue();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
IStreamSender sender = (IStreamSender)connection.OpenStreamSender("test-qos").OpenTask.Result;
IStreamSenderMessage message = sender.BeginMessage();
try
{
sender.BeginMessage();
Assert.Fail("Should not be able create a new streaming sender message before last one is completed.");
}
catch (ClientIllegalStateException)
{
// Expected
}
try
{
sender.BeginMessageAsync().GetAwaiter().GetResult();
Assert.Fail("Should not be able create a new streaming sender message before last one is completed.");
}
catch (ClientIllegalStateException)
{
// Expected
}
message.Abort();
Assert.Throws<ClientIllegalStateException>(() => message.Complete());
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestCannotAssignAnOutputStreamToTheMessageBody()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond(); // Hidden session for stream sender
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(10).Queue();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
IStreamSender sender = (IStreamSender)connection.OpenStreamSender("test-qos").OpenTask.Result;
IStreamSenderMessage message = sender.BeginMessage();
try
{
message.Body = new MemoryStream();
Assert.Fail("Should not be able assign an output stream to the message body");
}
catch (ClientUnsupportedOperationException)
{
// Expected
}
message.Abort();
Assert.Throws<ClientIllegalStateException>(() => message.Complete());
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestCannotModifyMessagePreambleAfterWritesHaveStarted()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond(); // Hidden session for stream sender
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(10).Queue();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
IStreamSender sender = (IStreamSender)connection.OpenStreamSender("test-qos").OpenTask.Result;
IStreamSenderMessage message = sender.BeginMessage();
message.Durable = true;
message.MessageId = "test";
message.SetAnnotation("key", "value");
message.SetProperty("key", "value");
_ = message.Body;
try
{
message.Durable = false;
Assert.Fail("Should not be able to modify message preamble after body writes started");
}
catch (ClientIllegalStateException)
{
// Expected
}
try
{
message.MessageId = "test1";
Assert.Fail("Should not be able to modify message preamble after body writes started");
}
catch (ClientIllegalStateException)
{
// Expected
}
try
{
message.SetAnnotation("key1", "value");
Assert.Fail("Should not be able to modify message preamble after body writes started");
}
catch (ClientIllegalStateException)
{
// Expected
}
try
{
message.SetProperty("key", "value");
Assert.Fail("Should not be able to modify message preamble after body writes started");
}
catch (ClientIllegalStateException)
{
// Expected
}
message.Abort();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestCreateStream()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.ExpectTransfer().WithMore(false).WithNullPayload();
peer.ExpectDetach().WithClosed(true).Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-qos");
IStreamSenderMessage tracker = sender.BeginMessage();
OutputStreamOptions options = new OutputStreamOptions();
Stream stream = tracker.GetBodyStream(options);
Assert.IsNotNull(stream);
sender.OpenTask.Wait();
stream.Close();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestCreateStreamFromAsyncBegin()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.ExpectTransfer().WithMore(false).WithNullPayload();
peer.ExpectDetach().WithClosed(true).Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-qos");
IStreamSenderMessage tracker = sender.BeginMessageAsync().GetAwaiter().GetResult();
OutputStreamOptions options = new OutputStreamOptions();
Stream stream = tracker.GetBodyStream(options);
Assert.IsNotNull(stream);
sender.OpenTask.Wait();
stream.Close();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestOutputStreamOptionsEnforcesValidBodySizeValues()
{
OutputStreamOptions options = new OutputStreamOptions();
options.BodyLength = 1024;
options.BodyLength = int.MaxValue;
Assert.Throws<ArgumentOutOfRangeException>(() => options.BodyLength = -1);
}
[Test]
public void TestFlushWithSetNonBodySectionsThenClose()
{
DoTestNonBodySectionWrittenWhenNoWritesToStream(true);
}
[Test]
public void TestCloseWithSetNonBodySections()
{
DoTestNonBodySectionWrittenWhenNoWritesToStream(false);
}
private void DoTestNonBodySectionWrittenWhenNoWritesToStream(bool flushBeforeClose)
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage message = sender.BeginMessage();
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.TimeToLive = 65535;
header.FirstAcquirer = true;
header.DeliveryCount = 2;
message.Header = header;
OutputStreamOptions options = new OutputStreamOptions();
Stream stream = message.GetBodyStream(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
peer.WaitForScriptToComplete();
if (flushBeforeClose)
{
peer.ExpectTransfer().WithMore(true).WithPayload(payloadMatcher);
peer.ExpectTransfer().WithMore(false).WithNullPayload()
.Respond()
.WithSettled(true).WithState().Accepted();
}
else
{
peer.ExpectTransfer().WithMore(false).WithPayload(payloadMatcher)
.Respond()
.WithSettled(true).WithState().Accepted();
}
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
// Once flush is called than anything in the buffer is written regardless of
// there being any actual stream writes. Default close action is to complete
// the delivery.
if (flushBeforeClose)
{
stream.Flush();
}
stream.Close();
message.Tracker.AwaitSettlement(TimeSpan.FromSeconds(5));
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestFlushAfterFirstWriteEncodesAMQPHeaderAndMessageBuffer()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage message = sender.BeginMessage();
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.TimeToLive = 65535;
header.FirstAcquirer = true;
header.DeliveryCount = 2;
message.Header = header;
OutputStreamOptions options = new OutputStreamOptions();
Stream stream = message.GetBodyStream(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
DataMatcher dataMatcher = new DataMatcher(new byte[] { 0, 1, 2, 3 });
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
payloadMatcher.MessageContentMatcher = dataMatcher;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithMore(true).WithPayload(payloadMatcher);
peer.ExpectTransfer().WithMore(false).WithNullPayload();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
// Stream won't output until some body bytes are written since the buffer was not
// filled by the header write. Then the close will complete the stream message.
stream.Write(new byte[] { 0, 1, 2, 3 });
stream.Flush();
stream.Close();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestAutoFlushAfterSingleWriteExceedsConfiguredBufferLimit()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue", new StreamSenderOptions()
{
WriteBufferSize = 512
});
IStreamSenderMessage tracker = sender.BeginMessage();
byte[] payload = new byte[512];
Array.Fill(payload, (byte)16);
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.TimeToLive = 65535;
header.FirstAcquirer = true;
header.DeliveryCount = 2;
tracker.Header = header;
OutputStreamOptions options = new OutputStreamOptions();
Stream stream = tracker.GetBodyStream(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
DataMatcher dataMatcher = new DataMatcher(payload);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
payloadMatcher.MessageContentMatcher = dataMatcher;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher).WithMore(true);
// Stream won't output until some body bytes are written.
stream.Write(payload);
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithNullPayload().WithMore(false).Accept();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
stream.Close();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestAutoFlushDuringWriteThatExceedConfiguredBufferLimit()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue", new StreamSenderOptions()
{
WriteBufferSize = 256
});
IStreamSenderMessage tracker = sender.BeginMessage();
byte[] payload1 = new byte[256];
Array.Fill(payload1, (byte)1);
byte[] payload2 = new byte[256];
Array.Fill(payload2, (byte)2);
byte[] payload3 = new byte[256];
Array.Fill(payload3, (byte)3);
byte[] payload4 = new byte[256];
Array.Fill(payload4, (byte)4);
byte[] payload = new byte[1024];
Array.Copy(payload1, 0, payload, 0, 256);
Array.Copy(payload2, 0, payload, 256, 256);
Array.Copy(payload3, 0, payload, 512, 256);
Array.Copy(payload4, 0, payload, 768, 256);
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.TimeToLive = 65535;
header.FirstAcquirer = true;
header.DeliveryCount = 2;
tracker.Header = header;
OutputStreamOptions options = new OutputStreamOptions();
Stream stream = tracker.GetBodyStream(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
DataMatcher dataMatcher1 = new DataMatcher(payload1);
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.HeaderMatcher = headerMatcher;
payloadMatcher1.MessageContentMatcher = dataMatcher1;
DataMatcher dataMatcher2 = new DataMatcher(payload2);
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.MessageContentMatcher = dataMatcher2;
DataMatcher dataMatcher3 = new DataMatcher(payload3);
TransferPayloadCompositeMatcher payloadMatcher3 = new TransferPayloadCompositeMatcher();
payloadMatcher3.MessageContentMatcher = dataMatcher3;
DataMatcher dataMatcher4 = new DataMatcher(payload4);
TransferPayloadCompositeMatcher payloadMatcher4 = new TransferPayloadCompositeMatcher();
payloadMatcher4.MessageContentMatcher = dataMatcher4;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher1).WithMore(true);
peer.ExpectTransfer().WithPayload(payloadMatcher2).WithMore(true);
peer.ExpectTransfer().WithPayload(payloadMatcher3).WithMore(true);
peer.ExpectTransfer().WithPayload(payloadMatcher4).WithMore(true);
// Stream won't output until some body bytes are written.
stream.Write(payload);
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithNullPayload().WithMore(false).Accept();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
stream.Close();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestAutoFlushDuringWriteThatExceedConfiguredBufferLimitSessionCreditLimitOnTransfer()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue", new StreamSenderOptions()
{
WriteBufferSize = 256
});
IStreamSenderMessage tracker = sender.BeginMessage();
byte[] payload1 = new byte[256];
Array.Fill(payload1, (byte)1);
byte[] payload2 = new byte[256];
Array.Fill(payload2, (byte)2);
byte[] payload3 = new byte[256];
Array.Fill(payload3, (byte)3);
byte[] payload4 = new byte[256];
Array.Fill(payload4, (byte)4);
byte[] payload = new byte[1024];
Array.Copy(payload1, 0, payload, 0, 256);
Array.Copy(payload2, 0, payload, 256, 256);
Array.Copy(payload3, 0, payload, 512, 256);
Array.Copy(payload4, 0, payload, 768, 256);
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.TimeToLive = 65535;
header.FirstAcquirer = true;
header.DeliveryCount = 2;
tracker.Header = header;
OutputStreamOptions options = new OutputStreamOptions();
Stream stream = tracker.GetBodyStream(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
DataMatcher dataMatcher1 = new DataMatcher(payload1);
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.HeaderMatcher = headerMatcher;
payloadMatcher1.MessageContentMatcher = dataMatcher1;
DataMatcher dataMatcher2 = new DataMatcher(payload2);
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.MessageContentMatcher = dataMatcher2;
DataMatcher dataMatcher3 = new DataMatcher(payload3);
TransferPayloadCompositeMatcher payloadMatcher3 = new TransferPayloadCompositeMatcher();
payloadMatcher3.MessageContentMatcher = dataMatcher3;
DataMatcher dataMatcher4 = new DataMatcher(payload4);
TransferPayloadCompositeMatcher payloadMatcher4 = new TransferPayloadCompositeMatcher();
payloadMatcher4.MessageContentMatcher = dataMatcher4;
CountdownEvent sendComplete = new CountdownEvent(1);
bool sendFailed = false;
// Stream won't output until some body bytes are written.
Task.Run(() =>
{
try
{
stream.Write(payload);
}
catch (IOException e)
{
logger.LogInformation("send failed with error: {0}", e.Message);
sendFailed = true;
}
finally
{
sendComplete.Signal();
}
});
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher1).WithMore(true);
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(2).WithLinkCredit(10).Queue();
peer.ExpectTransfer().WithPayload(payloadMatcher2).WithMore(true);
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(3).WithLinkCredit(10).Queue();
peer.ExpectTransfer().WithPayload(payloadMatcher3).WithMore(true);
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(4).WithLinkCredit(10).Queue();
peer.ExpectTransfer().WithPayload(payloadMatcher4).WithMore(true);
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(5).WithLinkCredit(10).Queue();
peer.ExpectTransfer().WithNullPayload().WithMore(false).Accept();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
// Initiate the above script of transfers and flows
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(1).WithLinkCredit(10).Now();
Assert.IsTrue(sendComplete.Wait(TimeSpan.FromSeconds(10)));
stream.Close();
Assert.IsFalse(sendFailed);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestCloseAfterSingleWriteEncodesAndCompletesTransferWhenNoStreamSizeConfigured()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage tracker = sender.BeginMessage();
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.TimeToLive = 65535;
header.FirstAcquirer = true;
header.DeliveryCount = 2;
tracker.Header = header;
OutputStreamOptions options = new OutputStreamOptions();
Stream stream = tracker.GetBodyStream(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
DataMatcher dataMatcher = new DataMatcher(new byte[] { 0, 1, 2, 3 });
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
payloadMatcher.MessageContentMatcher = dataMatcher;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher).WithMore(false).Accept();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
// Stream won't output until some body bytes are written.
stream.Write(new byte[] { 0, 1, 2, 3 });
stream.Close();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestFlushAfterSecondWriteDoesNotEncodeAMQPHeaderFromConfiguration()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage tracker = sender.BeginMessage();
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.TimeToLive = 65535;
header.FirstAcquirer = true;
header.DeliveryCount = 2;
tracker.Header = header;
OutputStreamOptions options = new OutputStreamOptions();
Stream stream = tracker.GetBodyStream(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
DataMatcher dataMatcher1 = new DataMatcher(new byte[] { 0, 1, 2, 3 });
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.HeaderMatcher = headerMatcher;
payloadMatcher1.MessageContentMatcher = dataMatcher1;
// Second flush expectation
DataMatcher dataMatcher2 = new DataMatcher(new byte[] { 4, 5, 6, 7 });
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.MessageContentMatcher = dataMatcher2;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher1).WithMore(true);
peer.ExpectTransfer().WithPayload(payloadMatcher2).WithMore(true);
peer.ExpectTransfer().WithNullPayload().WithMore(false).Accept();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
// Stream won't output until some body bytes are written.
stream.Write(new byte[] { 0, 1, 2, 3 });
stream.Flush();
// Next write should only be a single Data section
stream.Write(new byte[] { 4, 5, 6, 7 });
stream.Flush();
// Final Transfer that completes the Delivery
stream.Close();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestIncompleteStreamClosureCausesTransferAbort()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage tracker = sender.BeginMessage();
byte[] payload = new byte[] { 0, 1, 2, 3 };
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.DeliveryCount = 1;
tracker.Header = header;
OutputStreamOptions options = new OutputStreamOptions()
{
BodyLength = 8192
};
Stream stream = tracker.GetBodyStream(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithDeliveryCount(1);
PartialDataSectionMatcher partialDataMatcher = new PartialDataSectionMatcher(8192, payload);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
payloadMatcher.MessageContentMatcher = partialDataMatcher;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher);
peer.ExpectTransfer().WithAborted(true).WithNullPayload();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
stream.Write(payload);
stream.Flush();
// Stream should abort the send now since the configured size wasn't sent.
stream.Close();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestIncompleteStreamClosureWithNoWritesAbortsTransfer()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage message = sender.BeginMessage();
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.DeliveryCount = 1;
message.Header = header;
OutputStreamOptions options = new OutputStreamOptions()
{
BodyLength = 8192,
CompleteSendOnClose = false
};
Stream stream = message.GetBodyStream(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithDeliveryCount(1);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
// This should abort the transfer as we might have triggered output upon create when the
// preamble was written.
stream.Close();
Assert.IsTrue(message.Aborted);
// Should have no affect.
message.Abort();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestCompleteStreamClosureCausesTransferCompleted()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(3).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage tracker = sender.BeginMessage();
byte[] payload1 = new byte[] { 0, 1, 2, 3, 4, 5 };
byte[] payload2 = new byte[] { 6, 7, 8, 9, 10, 11, 12, 13, 14 };
byte[] payload3 = new byte[] { 15 };
int payloadSize = payload1.Length + payload2.Length + payload3.Length;
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.DeliveryCount = 1;
tracker.Header = header;
// Populate message application properties
tracker.SetProperty("ap1", 1);
tracker.SetProperty("ap2", 2);
tracker.SetProperty("ap3", 3);
OutputStreamOptions options = new OutputStreamOptions()
{
BodyLength = payloadSize
};
Stream stream = tracker.GetBodyStream(options);
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithDeliveryCount(1);
ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
apMatcher.WithEntry("ap1", Test.Driver.Matchers.Is.EqualTo(1));
apMatcher.WithEntry("ap2", Test.Driver.Matchers.Is.EqualTo(2));
apMatcher.WithEntry("ap3", Test.Driver.Matchers.Is.EqualTo(3));
PartialDataSectionMatcher partialDataMatcher = new PartialDataSectionMatcher(payloadSize, payload1);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
payloadMatcher.MessageContentMatcher = partialDataMatcher;
payloadMatcher.ApplicationPropertiesMatcher = apMatcher;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher);
stream.Write(payload1);
stream.Flush();
peer.WaitForScriptToComplete();
partialDataMatcher = new PartialDataSectionMatcher(payload2);
payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.MessageContentMatcher = partialDataMatcher;
peer.ExpectTransfer().WithMore(true).WithPayload(partialDataMatcher);
stream.Write(payload2);
stream.Flush();
peer.WaitForScriptToComplete();
partialDataMatcher = new PartialDataSectionMatcher(payload3);
payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.MessageContentMatcher = partialDataMatcher;
peer.ExpectTransfer().WithMore(false).WithPayload(partialDataMatcher).Accept();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
stream.Write(payload3);
stream.Flush();
// Stream should already be completed so no additional frames should be written.
stream.Close();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestRawOutputStreamFromMessageWritesUnmodifiedBytes()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage message = sender.BeginMessage();
Stream stream = message.RawOutputStream();
// Only one writer at a time can exist
Assert.Throws<ClientIllegalStateException>(() => message.RawOutputStream());
Assert.Throws<ClientIllegalStateException>(() => message.GetBodyStream());
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithMore(true).WithPayload(new byte[] { 0, 1, 2, 3 });
peer.ExpectTransfer().WithMore(false).WithNullPayload();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
stream.Write(new byte[] { 0, 1, 2, 3 });
stream.Flush();
stream.Close();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestStreamSenderMessageWithDeliveryAnnotations()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(10).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
// Populate delivery annotations
IDictionary<string, object> deliveryAnnotations = new Dictionary<string, object>();
deliveryAnnotations.Add("da1", 1);
deliveryAnnotations.Add("da2", 2);
deliveryAnnotations.Add("da3", 3);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage message = sender.BeginMessage(deliveryAnnotations);
byte[] payload = new byte[] { 0, 1, 2, 3, 4, 5 };
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
PropertiesMatcher propertiesMatcher = new PropertiesMatcher(true);
propertiesMatcher.WithMessageId("ID:12345");
propertiesMatcher.WithUserId(Encoding.UTF8.GetBytes("user"));
propertiesMatcher.WithTo("the-management");
propertiesMatcher.WithSubject("amqp");
propertiesMatcher.WithReplyTo("the-minions");
propertiesMatcher.WithCorrelationId("abc");
propertiesMatcher.WithContentEncoding("application/json");
propertiesMatcher.WithContentType("gzip");
propertiesMatcher.WithAbsoluteExpiryTime(123);
propertiesMatcher.WithCreationTime(1);
propertiesMatcher.WithGroupId("disgruntled");
propertiesMatcher.WithGroupSequence(8192);
propertiesMatcher.WithReplyToGroupId("/dev/null");
DeliveryAnnotationsMatcher daMatcher = new DeliveryAnnotationsMatcher(true);
daMatcher.WithEntry("da1", Test.Driver.Matchers.Is.EqualTo(1));
daMatcher.WithEntry("da2", Test.Driver.Matchers.Is.EqualTo(2));
daMatcher.WithEntry("da3", Test.Driver.Matchers.Is.EqualTo(3));
MessageAnnotationsMatcher maMatcher = new MessageAnnotationsMatcher(true);
maMatcher.WithEntry("ma1", Test.Driver.Matchers.Is.EqualTo(1));
maMatcher.WithEntry("ma2", Test.Driver.Matchers.Is.EqualTo(2));
maMatcher.WithEntry("ma3", Test.Driver.Matchers.Is.EqualTo(3));
ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
apMatcher.WithEntry("ap1", Test.Driver.Matchers.Is.EqualTo(1));
apMatcher.WithEntry("ap2", Test.Driver.Matchers.Is.EqualTo(2));
apMatcher.WithEntry("ap3", Test.Driver.Matchers.Is.EqualTo(3));
DataMatcher bodyMatcher = new DataMatcher(payload);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
payloadMatcher.DeliveryAnnotationsMatcher = daMatcher;
payloadMatcher.MessageAnnotationsMatcher = maMatcher;
payloadMatcher.PropertiesMatcher = propertiesMatcher;
payloadMatcher.ApplicationPropertiesMatcher = apMatcher;
payloadMatcher.MessageContentMatcher = bodyMatcher;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher).WithMore(false).Accept();
// Populate all Header values
message.Durable = true;
Assert.AreEqual(true, message.Durable);
message.Priority = (byte)1;
Assert.AreEqual(1, message.Priority);
message.TimeToLive = 65535;
Assert.AreEqual(65535, message.TimeToLive);
message.FirstAcquirer = true;
Assert.IsTrue(message.FirstAcquirer);
message.DeliveryCount = 2;
Assert.AreEqual(2, message.DeliveryCount);
// Populate message annotations
Assert.IsFalse(message.HasAnnotations);
Assert.IsFalse(message.HasAnnotation("ma1"));
message.SetAnnotation("ma1", 1);
Assert.IsTrue(message.HasAnnotation("ma1"));
Assert.AreEqual(1, message.GetAnnotation("ma1"));
message.SetAnnotation("ma2", 2);
Assert.AreEqual(2, message.GetAnnotation("ma2"));
message.SetAnnotation("ma3", 3);
Assert.AreEqual(3, message.GetAnnotation("ma3"));
Assert.IsTrue(message.HasAnnotations);
// Populate all Properties values
message.MessageId = "ID:12345";
Assert.AreEqual("ID:12345", message.MessageId);
message.UserId = Encoding.UTF8.GetBytes("user");
Assert.AreEqual(Encoding.UTF8.GetBytes("user"), message.UserId);
message.To = "the-management";
Assert.AreEqual("the-management", message.To);
message.Subject = "amqp";
Assert.AreEqual("amqp", message.Subject);
message.ReplyTo = "the-minions";
Assert.AreEqual("the-minions", message.ReplyTo);
message.CorrelationId = "abc";
Assert.AreEqual("abc", message.CorrelationId);
message.ContentEncoding = "application/json";
Assert.AreEqual("application/json", message.ContentEncoding);
message.ContentType = "gzip";
Assert.AreEqual("gzip", message.ContentType);
message.AbsoluteExpiryTime = 123;
Assert.AreEqual(123, message.AbsoluteExpiryTime);
message.CreationTime = 1;
Assert.AreEqual(1, message.CreationTime);
message.GroupId = "disgruntled";
Assert.AreEqual("disgruntled", message.GroupId);
message.GroupSequence = 8192;
Assert.AreEqual(8192, message.GroupSequence);
message.ReplyToGroupId = "/dev/null";
Assert.AreEqual("/dev/null", message.ReplyToGroupId);
// Populate message application properties
Assert.IsFalse(message.HasProperties);
Assert.IsFalse(message.HasProperty("ma1"));
message.SetProperty("ap1", 1);
Assert.AreEqual(1, message.GetProperty("ap1"));
Assert.IsTrue(message.HasProperty("ap1"));
message.SetProperty("ap2", 2);
Assert.IsTrue(message.HasProperty("ap2"));
Assert.AreEqual(2, message.GetProperty("ap2"));
message.SetProperty("ap3", 3);
Assert.IsTrue(message.HasProperty("ap3"));
Assert.AreEqual(3, message.GetProperty("ap3"));
Assert.IsTrue(message.HasProperties);
Stream stream = message.Body;
stream.Write(payload);
stream.Close();
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
Assert.IsNotNull(message.Tracker);
Assert.IsTrue(message.Tracker.SettlementTask.Result.RemoteSettled);
Assert.IsTrue(message.Tracker.SettlementTask.Result.Settled);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestStreamSenderWritesFooterAfterStreamClosed()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(10).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage message = sender.BeginMessage();
byte[] payload = new byte[] { 0, 1, 2, 3, 4, 5 };
// First frame should include only the bits up to the body
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
apMatcher.WithEntry("ap1", Test.Driver.Matchers.Is.EqualTo(1));
apMatcher.WithEntry("ap2", Test.Driver.Matchers.Is.EqualTo(2));
apMatcher.WithEntry("ap3", Test.Driver.Matchers.Is.EqualTo(3));
FooterMatcher footerMatcher = new FooterMatcher(false);
footerMatcher.WithEntry("f1", Test.Driver.Matchers.Is.EqualTo(1));
footerMatcher.WithEntry("f2", Test.Driver.Matchers.Is.EqualTo(2));
footerMatcher.WithEntry("f3", Test.Driver.Matchers.Is.EqualTo(3));
DataMatcher bodyMatcher = new DataMatcher(payload, true);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
payloadMatcher.ApplicationPropertiesMatcher = apMatcher;
payloadMatcher.MessageContentMatcher = bodyMatcher;
payloadMatcher.FooterMatcher = footerMatcher;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher).WithMore(false).Accept();
// Populate all Header values
message.Durable = true;
message.Priority = (byte)1;
message.TimeToLive = 65535;
message.FirstAcquirer = true;
message.DeliveryCount = 2;
// Populate message application properties
message.SetProperty("ap1", 1);
message.SetProperty("ap2", 2);
message.SetProperty("ap3", 3);
// Populate message footers
Assert.IsFalse(message.HasFooters);
Assert.IsFalse(message.HasFooter("f1"));
message.SetFooter("f1", 1);
message.SetFooter("f2", 2);
message.SetFooter("f3", 3);
Assert.IsTrue(message.HasFooter("f1"));
Assert.IsTrue(message.HasFooters);
OutputStreamOptions bodyOptions = new OutputStreamOptions()
{
CompleteSendOnClose = true
};
Stream stream = message.GetBodyStream(bodyOptions);
Assert.Throws<ClientUnsupportedOperationException>(() => message.Encode(new Dictionary<string, object>()));
stream.Write(payload);
stream.Close();
Assert.Throws<ClientIllegalStateException>(() => message.Footer = new Footer());
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
Assert.IsNotNull(message.Tracker);
Assert.IsTrue(message.Tracker.SettlementTask.Result.RemoteSettled);
Assert.IsTrue(message.Tracker.SettlementTask.Result.Settled);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestStreamSenderWritesFooterAfterMessageCompleted()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(10).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage message = sender.BeginMessage();
byte[] payload = new byte[] { 0, 1, 2, 3, 4, 5 };
// First frame should include only the bits up to the body
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true);
apMatcher.WithEntry("ap1", Test.Driver.Matchers.Is.EqualTo(1));
apMatcher.WithEntry("ap2", Test.Driver.Matchers.Is.EqualTo(2));
apMatcher.WithEntry("ap3", Test.Driver.Matchers.Is.EqualTo(3));
DataMatcher bodyMatcher = new DataMatcher(payload);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
payloadMatcher.ApplicationPropertiesMatcher = apMatcher;
payloadMatcher.MessageContentMatcher = bodyMatcher;
// Second Frame should contains the appended footers
FooterMatcher footerMatcher = new FooterMatcher(false);
footerMatcher.WithEntry("f1", Test.Driver.Matchers.Is.EqualTo(1));
footerMatcher.WithEntry("f2", Test.Driver.Matchers.Is.EqualTo(2));
footerMatcher.WithEntry("f3", Test.Driver.Matchers.Is.EqualTo(3));
TransferPayloadCompositeMatcher payloadFooterMatcher = new TransferPayloadCompositeMatcher();
payloadFooterMatcher.FooterMatcher = footerMatcher;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher).WithMore(true);
peer.ExpectTransfer().WithPayload(payloadFooterMatcher).WithMore(false).Accept();
// Populate all Header values
message.Durable = true;
message.Priority = (byte)1;
message.TimeToLive = 65535;
message.FirstAcquirer = true;
message.DeliveryCount = 2;
// Populate message application properties
message.SetProperty("ap1", 1);
message.SetProperty("ap2", 2);
message.SetProperty("ap3", 3);
OutputStreamOptions bodyOptions = new OutputStreamOptions()
{
CompleteSendOnClose = false
};
Stream stream = message.GetBodyStream(bodyOptions);
stream.Write(payload);
stream.Close();
// Populate message footers
message.SetFooter("f1", 1);
message.SetFooter("f2", 2);
message.SetFooter("f3", 3);
message.Complete();
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
Assert.IsNotNull(message.Tracker);
Assert.IsTrue(message.Tracker.SettlementTask.Result.RemoteSettled);
Assert.IsTrue(message.Tracker.SettlementTask.Result.Settled);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestAutoFlushDuringMessageSendThatExceedConfiguredBufferLimitSessionCreditLimitOnTransfer()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
ConnectionOptions options = new ConnectionOptions()
{
MaxFrameSize = 1024
};
IConnection connection = container.Connect(remoteAddress, remotePort, options);
IStreamSender sender = connection.OpenStreamSender("test-queue");
byte[] payload = new byte[4800];
Array.Fill(payload, (byte)1);
bool sendFailed = false;
Task.Run(() =>
{
try
{
sender.Send(IMessage<byte[]>.Create(payload));
}
catch (Exception e)
{
logger.LogInformation("send failed with error: {0}", e.Message);
sendFailed = true;
}
});
peer.WaitForScriptToComplete();
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(1).WithLinkCredit(10).Now();
peer.ExpectTransfer().WithNonNullPayload().WithMore(true);
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(2).WithLinkCredit(10).Queue();
peer.ExpectTransfer().WithNonNullPayload().WithMore(true);
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(3).WithLinkCredit(10).Queue();
peer.ExpectTransfer().WithNonNullPayload().WithMore(true);
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(4).WithLinkCredit(10).Queue();
peer.ExpectTransfer().WithNonNullPayload().WithMore(true);
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(5).WithLinkCredit(10).Queue();
peer.ExpectTransfer().WithNonNullPayload().WithMore(false).Accept();
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
Assert.IsFalse(sendFailed);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestConcurrentMessageSendOnlyBlocksForInitialSendInProgress()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(2).Queue();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfReceiver().Respond();
peer.ExpectFlow();
peer.ExpectTransfer().WithNonNullPayload().WithMore(false).Respond().WithSettled(true).WithState().Accepted();
peer.ExpectTransfer().WithNonNullPayload().WithMore(false).Respond().WithSettled(true).WithState().Accepted();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = (IStreamSender)connection.OpenStreamSender("test-queue").OpenTask.Result;
// Ensure that sender gets its flow before the sends are triggered.
connection.OpenReceiver("test-queue").OpenTask.Wait();
byte[] payload = new byte[1024];
Array.Fill(payload, (byte)1);
// One should block on the send waiting for the others send to finish
// otherwise they should not care about concurrency of sends.
bool sendFailed = false;
Task.Run(() =>
{
try
{
logger.LogInformation("Test send 1 is preparing to fire:");
IStreamTracker tracker = sender.Send(IMessage<byte[]>.Create(payload));
tracker.AwaitSettlement(TimeSpan.FromSeconds(10));
}
catch (Exception e)
{
logger.LogInformation("Test send 1 failed with error: {0}", e.Message);
sendFailed = true;
}
});
Task.Run(() =>
{
try
{
logger.LogInformation("Test send 2 is preparing to fire:");
IStreamTracker tracker = sender.Send(IMessage<byte[]>.Create(payload));
tracker.AwaitSettlement(TimeSpan.FromSeconds(10));
}
catch (Exception e)
{
logger.LogInformation("Test send 2 failed with error: {0}", e.Message);
sendFailed = true;
}
});
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
Assert.IsFalse(sendFailed);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestConcurrentMessageSendsBlocksBehindSendWaitingForCredit()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
byte[] payload = new byte[1024];
Array.Fill(payload, (byte)1);
CountdownEvent send1Started = new CountdownEvent(1);
CountdownEvent send2Completed = new CountdownEvent(1);
bool sendFailed = false;
Task.Run(() =>
{
try
{
logger.LogInformation("Test send 1 is preparing to fire:");
Task.Run(() => send1Started.Signal());
sender.Send(IMessage<byte[]>.Create(payload));
}
catch (Exception e)
{
logger.LogInformation("Test send 1 failed with error: {0}", e.Message);
sendFailed = true;
}
});
Task.Run(() =>
{
try
{
Assert.IsTrue(send1Started.Wait(TimeSpan.FromSeconds(10)));
logger.LogInformation("Test send 2 is preparing to fire:");
IStreamTracker tracker = sender.Send(IMessage<byte[]>.Create(payload));
tracker.AwaitSettlement(TimeSpan.FromSeconds(10));
send2Completed.Signal();
}
catch (Exception e)
{
logger.LogInformation("Test send 2 failed with error: {0}", e.Message);
sendFailed = true;
}
});
peer.WaitForScriptToComplete();
peer.RemoteFlow().WithIncomingWindow(1).WithDeliveryCount(0).WithNextIncomingId(1).WithLinkCredit(1).Now();
peer.ExpectTransfer().WithNonNullPayload().WithMore(false).Respond().WithSettled(true).WithState().Accepted();
peer.RemoteFlow().WithIncomingWindow(1).WithDeliveryCount(1).WithNextIncomingId(2).WithLinkCredit(1).Queue();
peer.ExpectTransfer().WithNonNullPayload().WithMore(false).Respond().WithSettled(true).WithState().Accepted();
Assert.IsTrue(send2Completed.Wait(TimeSpan.FromSeconds(10)));
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
Assert.IsFalse(sendFailed);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestConcurrentMessageSendWaitingOnSplitFramedSendToCompleteIsSentAfterCreditUpdated()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
ConnectionOptions options = new ConnectionOptions()
{
MaxFrameSize = 1024
};
IConnection connection = container.Connect(remoteAddress, remotePort, options);
IStreamSender sender = connection.OpenStreamSender("test-queue");
byte[] payload = new byte[1536];
Array.Fill(payload, (byte)1);
CountdownEvent send1Started = new CountdownEvent(1);
CountdownEvent send2Completed = new CountdownEvent(1);
bool sendFailed = false;
Task.Run(() =>
{
try
{
logger.LogInformation("Test send 1 is preparing to fire:");
Task.Run(() => send1Started.Signal());
sender.Send(IMessage<byte[]>.Create(payload));
}
catch (Exception e)
{
logger.LogInformation("Test send 1 failed with error: {0}", e.Message);
sendFailed = true;
}
});
Task.Run(() =>
{
try
{
Assert.IsTrue(send1Started.Wait(TimeSpan.FromSeconds(10)));
logger.LogInformation("Test send 2 is preparing to fire:");
IStreamTracker tracker = sender.Send(IMessage<byte[]>.Create(payload));
tracker.AwaitSettlement(TimeSpan.FromSeconds(10));
send2Completed.Signal();
}
catch (Exception e)
{
logger.LogInformation("Test send 2 failed with error: {0}", e.Message);
sendFailed = true;
}
});
peer.WaitForScriptToComplete(TimeSpan.FromSeconds(15));
peer.RemoteFlow().WithIncomingWindow(1).WithDeliveryCount(0).WithNextIncomingId(1).WithLinkCredit(1).Now();
peer.ExpectTransfer().WithNonNullPayload().WithMore(true);
peer.RemoteFlow().WithIncomingWindow(1).WithDeliveryCount(0).WithNextIncomingId(2).WithLinkCredit(1).Queue();
peer.ExpectTransfer().WithNonNullPayload().WithMore(false).Respond().WithSettled(true).WithState().Accepted();
peer.RemoteFlow().WithIncomingWindow(1).WithDeliveryCount(1).WithNextIncomingId(3).WithLinkCredit(1).Queue();
peer.ExpectTransfer().WithNonNullPayload().WithMore(true);
peer.RemoteFlow().WithIncomingWindow(1).WithDeliveryCount(1).WithNextIncomingId(4).WithLinkCredit(1).Queue();
peer.ExpectTransfer().WithNonNullPayload().WithMore(false).Respond().WithSettled(true).WithState().Accepted();
Assert.IsTrue(send2Completed.Wait(TimeSpan.FromSeconds(10)));
peer.WaitForScriptToComplete(TimeSpan.FromSeconds(15));
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
Assert.IsFalse(sendFailed);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete(TimeSpan.FromSeconds(15));
}
}
[Test]
public void TestMessageSendWhileStreamSendIsOpenShouldBlock()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
byte[] payload = new byte[1536];
Array.Fill(payload, (byte)1);
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage message = sender.BeginMessage();
OutputStreamOptions options = new OutputStreamOptions()
{
BodyLength = 8192,
CompleteSendOnClose = false
};
Stream stream = message.GetBodyStream(options);
CountdownEvent sendStarted = new CountdownEvent(1);
CountdownEvent sendCompleted = new CountdownEvent(1);
bool sendFailed = false;
Task.Run(() =>
{
try
{
logger.LogInformation("Test send 1 is preparing to fire:");
sendStarted.Signal();
sender.Send(IMessage<byte[]>.Create(payload));
sendCompleted.Signal();
}
catch (Exception e)
{
logger.LogInformation("Test send 1 failed with error: {0}", e.Message);
sendFailed = true;
}
});
DataMatcher bodyMatcher = new DataMatcher(payload);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.MessageContentMatcher = bodyMatcher;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher).Accept();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
Assert.IsTrue(sendStarted.Wait(TimeSpan.FromSeconds(10)));
// This should abort the streamed send as we provided a size for the body.
stream.Close();
Assert.IsTrue(message.Aborted);
Assert.IsTrue(sendCompleted.Wait(TimeSpan.FromSeconds(10)));
Assert.Throws<ClientIllegalStateException>(() => message.RawOutputStream());
Assert.Throws<ClientIllegalStateException>(() => _ = message.Body);
Assert.IsFalse(sendFailed);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestStreamSenderSessionCannotCreateNewResources()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
Assert.Throws<ClientUnsupportedOperationException>(() => sender.Session.OpenReceiver("test"));
Assert.Throws<ClientUnsupportedOperationException>(() => sender.Session.OpenReceiver("test", new ReceiverOptions()));
Assert.Throws<ClientUnsupportedOperationException>(() => sender.Session.OpenDurableReceiver("test", "test"));
Assert.Throws<ClientUnsupportedOperationException>(() => sender.Session.OpenDurableReceiver("test", "test", new ReceiverOptions()));
Assert.Throws<ClientUnsupportedOperationException>(() => sender.Session.OpenDynamicReceiver());
Assert.Throws<ClientUnsupportedOperationException>(() => sender.Session.OpenDynamicReceiver(new ReceiverOptions()));
Assert.Throws<ClientUnsupportedOperationException>(() => sender.Session.OpenDynamicReceiver(new ReceiverOptions(), new Dictionary<string, object>()));
Assert.Throws<ClientUnsupportedOperationException>(() => sender.Session.OpenSender("test"));
Assert.Throws<ClientUnsupportedOperationException>(() => sender.Session.OpenSender("test", new SenderOptions()));
Assert.Throws<ClientUnsupportedOperationException>(() => sender.Session.OpenAnonymousSender());
Assert.Throws<ClientUnsupportedOperationException>(() => sender.Session.OpenAnonymousSender(new SenderOptions()));
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestStreamMessageWaitingOnCreditWritesWhileCompleteSendWaitsInQueue()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage tracker = sender.BeginMessage();
Stream stream = tracker.Body;
byte[] payload1 = new byte[256];
Array.Fill(payload1, (byte)1);
byte[] payload2 = new byte[256];
Array.Fill(payload2, (byte)2);
byte[] payload3 = new byte[256];
Array.Fill(payload3, (byte)3);
DataMatcher dataMatcher1 = new DataMatcher(payload1);
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.MessageContentMatcher = dataMatcher1;
DataMatcher dataMatcher2 = new DataMatcher(payload2);
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.MessageContentMatcher = dataMatcher2;
DataMatcher dataMatcher3 = new DataMatcher(payload3);
TransferPayloadCompositeMatcher payloadMatcher3 = new TransferPayloadCompositeMatcher();
payloadMatcher3.MessageContentMatcher = dataMatcher3;
bool sendFailed = false;
CountdownEvent streamSend1Complete = new CountdownEvent(1);
// Stream won't output until some body bytes are written.
Task.Run(() =>
{
try
{
stream.Write(payload1);
stream.Flush();
}
catch (IOException e)
{
logger.LogInformation("send failed with error: {0}", e.Message);
sendFailed = true;
}
finally
{
streamSend1Complete.Signal();
}
});
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher1).WithMore(true);
// Now trigger the next send by granting credit for payload 1
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(1).WithLinkCredit(10).Now();
Assert.IsTrue(streamSend1Complete.Wait(TimeSpan.FromSeconds(5)), "Stream sender completed first send");
Assert.IsFalse(sendFailed);
CountdownEvent sendStarted = new CountdownEvent(1);
CountdownEvent sendCompleted = new CountdownEvent(1);
Task.Run(() =>
{
try
{
logger.LogInformation("Test send 1 is preparing to fire:");
sendStarted.Signal();
sender.Send(IMessage<byte[]>.Create(payload3));
}
catch (Exception e)
{
logger.LogInformation("Test send 1 failed with error: {0}", e.Message);
sendFailed = true;
}
finally
{
sendCompleted.Signal();
}
});
Assert.IsTrue(sendStarted.Wait(TimeSpan.FromSeconds(10)));
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher2).WithMore(true);
// Queue a flow that will allow send by granting credit for payload 3 via sender.send
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(3).WithLinkCredit(10).Queue();
// Now trigger the next send by granting credit for payload 2
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(2).WithLinkCredit(10).Now();
stream.Write(payload2);
stream.Flush();
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithNullPayload().WithMore(false).Accept();
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(4).WithLinkCredit(10).Queue();
peer.ExpectTransfer().WithPayload(payloadMatcher3).WithMore(false);
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
stream.Close();
Assert.IsTrue(sendCompleted.Wait(TimeSpan.FromSeconds(10)));
Assert.IsFalse(sendFailed);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestWriteToCreditLimitFramesOfMessagePayloadOneBytePerWrite()
{
uint WRITE_COUNT = 10;
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithIncomingWindow(WRITE_COUNT).WithNextIncomingId(0).WithLinkCredit(WRITE_COUNT).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage tracker = sender.BeginMessage();
Stream stream = tracker.Body;
peer.WaitForScriptToComplete();
byte[][] payloads = new byte[WRITE_COUNT][];
for (int i = 0; i < WRITE_COUNT; ++i)
{
payloads[i] = new byte[256];
Array.Fill(payloads[i], (byte)(i + 1));
}
for (int i = 0; i < WRITE_COUNT; ++i)
{
DataMatcher dataMatcher = new DataMatcher(payloads[i]);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.MessageContentMatcher = dataMatcher;
peer.ExpectTransfer().WithPayload(payloadMatcher).WithMore(true);
}
for (int i = 0; i < WRITE_COUNT; ++i)
{
foreach (byte value in payloads[i])
{
stream.WriteByte(value);
}
stream.Flush();
}
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithNullPayload().WithMore(false).Accept();
// grant one more credit for the complete to arrive.
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(WRITE_COUNT).WithLinkCredit(1).Later(10);
stream.Close();
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestWriteToCreditLimitFramesOfMessagePayload()
{
uint WRITE_COUNT = 10;
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithIncomingWindow(WRITE_COUNT).WithNextIncomingId(0).WithLinkCredit(WRITE_COUNT).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage tracker = sender.BeginMessage();
Stream stream = tracker.Body;
peer.WaitForScriptToComplete();
byte[][] payloads = new byte[WRITE_COUNT][];
for (int i = 0; i < WRITE_COUNT; ++i)
{
payloads[i] = new byte[256];
Array.Fill(payloads[i], (byte)(i + 1));
}
for (int i = 0; i < WRITE_COUNT; ++i)
{
DataMatcher dataMatcher = new DataMatcher(payloads[i]);
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.MessageContentMatcher = dataMatcher;
peer.ExpectTransfer().WithPayload(payloadMatcher).WithMore(true);
}
for (int i = 0; i < WRITE_COUNT; ++i)
{
stream.Write(payloads[i]);
stream.Flush();
}
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithNullPayload().WithMore(false).Accept();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
// grant one more credit for the complete to arrive.
peer.RemoteFlow().WithIncomingWindow(1).WithNextIncomingId(WRITE_COUNT).WithLinkCredit(1).Now();
stream.Close();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestStreamMessageFlushFailsAfterConnectionDropped()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
CountdownEvent disconnected = new CountdownEvent(1);
IClient container = IClient.Create();
ConnectionOptions options = new ConnectionOptions();
options.DisconnectedHandler = (connection, eventArgs) =>
{
disconnected.Signal();
};
IConnection connection = container.Connect(remoteAddress, remotePort, options);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage message = sender.BeginMessage();
Stream stream = message.Body;
DataMatcher dataMatcher1 = new DataMatcher(new byte[] { 0, 1, 2, 3 });
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.MessageContentMatcher = dataMatcher1;
DataMatcher dataMatcher2 = new DataMatcher(new byte[] { 4, 5, 6, 7 });
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.MessageContentMatcher = dataMatcher2;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher1).WithMore(true);
peer.ExpectTransfer().WithPayload(payloadMatcher2).WithMore(true);
peer.DropAfterLastHandler();
// Write two then after connection drops the message should fail on future writes
stream.Write(new byte[] { 0, 1, 2, 3 });
stream.Flush();
stream.Write(new byte[] { 4, 5, 6, 7 });
stream.Flush();
peer.WaitForScriptToComplete();
// Next write should fail as connection should have dropped.
stream.Write(new byte[] { 8, 9, 10, 11 });
logger.LogInformation("Waiting for connection to report it was disconnected");
Assert.IsTrue(disconnected.Wait(TimeSpan.FromSeconds(5)));
try
{
stream.Flush();
Assert.Fail("Should not be able to flush after connection drop");
}
catch (IOException ioe)
{
Assert.IsTrue(ioe.InnerException is ClientException);
}
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestStreamMessageCloseThatFlushesFailsAfterConnectionDropped()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
CountdownEvent disconnected = new CountdownEvent(1);
IClient container = IClient.Create();
ConnectionOptions options = new ConnectionOptions();
options.DisconnectedHandler = (conn, ev) => disconnected.Signal();
IConnection connection = container.Connect(remoteAddress, remotePort, options);
IStreamSender sender = connection.OpenStreamSender("test-queue");
IStreamSenderMessage message = sender.BeginMessage();
Stream stream = message.Body;
DataMatcher dataMatcher1 = new DataMatcher(new byte[] { 0, 1, 2, 3 });
TransferPayloadCompositeMatcher payloadMatcher1 = new TransferPayloadCompositeMatcher();
payloadMatcher1.MessageContentMatcher = dataMatcher1;
DataMatcher dataMatcher2 = new DataMatcher(new byte[] { 4, 5, 6, 7 });
TransferPayloadCompositeMatcher payloadMatcher2 = new TransferPayloadCompositeMatcher();
payloadMatcher2.MessageContentMatcher = dataMatcher2;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithPayload(payloadMatcher1).WithMore(true);
peer.ExpectTransfer().WithPayload(payloadMatcher2).WithMore(true);
peer.DropAfterLastHandler();
// Write two then after connection drops the message should fail on future writes
stream.Write(new byte[] { 0, 1, 2, 3 });
stream.Flush();
stream.Write(new byte[] { 4, 5, 6, 7 });
stream.Flush();
peer.WaitForScriptToComplete();
// Next write should fail as connection should have dropped.
stream.Write(new byte[] { 8, 9, 10, 11 });
Assert.IsTrue(disconnected.Wait(TimeSpan.FromSeconds(10)));
try
{
stream.Close();
Assert.Fail("Should not be able to close after connection drop");
}
catch (IOException ioe)
{
Assert.IsTrue(ioe.InnerException is ClientException);
}
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestStreamMessageWriteThatFlushesFailsAfterConnectionDropped()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(1).Queue();
peer.DropAfterLastHandler();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
CountdownEvent disconnected = new CountdownEvent(1);
IClient container = IClient.Create();
ConnectionOptions connectionOptions = new ConnectionOptions();
connectionOptions.DisconnectedHandler = (connection, eventArgs) => disconnected.Signal();
IConnection connection = container.Connect(remoteAddress, remotePort, connectionOptions);
StreamSenderOptions options = new StreamSenderOptions()
{
WriteBufferSize = 1024
};
IStreamSender sender = connection.OpenStreamSender("test-queue", options);
IStreamSenderMessage message = sender.BeginMessage();
byte[] payload = new byte[65535];
Array.Fill(payload, (byte)65);
OutputStreamOptions streamOptions = new OutputStreamOptions()
{
BodyLength = 65535
};
Stream stream = message.GetBodyStream(streamOptions);
peer.WaitForScriptToComplete();
Assert.IsTrue(disconnected.Wait(TimeSpan.FromSeconds(10)));
try
{
stream.Write(payload);
Assert.Fail("Should not be able to write section after connection drop");
}
catch (IOException ioe)
{
Assert.IsTrue(ioe.InnerException is ClientException);
}
connection.CloseAsync().GetAwaiter().GetResult();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestStreamMessageSendFromByteArrayInputStreamWithoutBodySizeSet()
{
DoTestStreamMessageSendFromByteArrayInputStream(false);
}
[Test]
public void TestStreamMessageSendFromByteArrayInputStreamWithBodySizeSet()
{
DoTestStreamMessageSendFromByteArrayInputStream(false);
}
private void DoTestStreamMessageSendFromByteArrayInputStream(bool setBodySize)
{
Random random = new Random(Environment.TickCount);
byte[] array = new byte[4096];
MemoryStream bytesIn = new MemoryStream(array);
// Populate the array with something other than zeros.
random.NextBytes(array);
CompositingDataSectionMatcher matcher = new CompositingDataSectionMatcher(array);
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(100).Queue();
for (int i = 0; i < (array.Length / 1023); ++i)
{
peer.ExpectTransfer().WithDeliveryId(0)
.WithMore(true)
.WithPayload(matcher);
}
// A small number of trailing bytes will be transmitted in the frame.
peer.ExpectTransfer().WithDeliveryId(0)
.WithMore(false)
.WithPayload(matcher);
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
StreamSenderOptions options = new StreamSenderOptions()
{
WriteBufferSize = 1023
};
IStreamSender sender = connection.OpenStreamSender("test-queue", options);
IStreamSenderMessage tracker = sender.BeginMessage();
Stream stream;
if (setBodySize)
{
stream = tracker.GetBodyStream(new OutputStreamOptions()
{
BodyLength = array.Length
});
}
else
{
stream = tracker.Body;
}
try
{
bytesIn.WriteTo(stream);
}
finally
{
// Ensure any trailing bytes get written and transfer marked as done.
stream.Close();
}
peer.WaitForScriptToComplete();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestBatchAddBodySectionsWritesEach()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectBegin().Respond(); // Hidden session for stream sender
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(10).Queue();
peer.ExpectAttach().Respond(); // Open a receiver to ensure sender link has processed
peer.ExpectFlow(); // the inbound flow frame we sent previously before send.
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
ISession session = connection.OpenSession().OpenTask.Result;
StreamSenderOptions options = new StreamSenderOptions();
options.DeliveryMode = DeliveryMode.AtMostOnce;
options.WriteBufferSize = int.MaxValue;
IStreamSender sender = connection.OpenStreamSender("test-qos", options);
// Create a custom message format send context and ensure that no early buffer writes take place
IStreamSenderMessage message = sender.BeginMessage();
Assert.AreEqual(Header.DEFAULT_PRIORITY, message.Priority);
Assert.AreEqual(Header.DEFAULT_DELIVERY_COUNT, message.DeliveryCount);
Assert.AreEqual(Header.DEFAULT_FIRST_ACQUIRER, message.FirstAcquirer);
Assert.AreEqual(Header.DEFAULT_TIME_TO_LIVE, message.TimeToLive);
Assert.AreEqual(Header.DEFAULT_DURABILITY, message.Durable);
// Gates send on remote flow having been sent and received
session.OpenReceiver("dummy").OpenTask.Wait();
HeaderMatcher headerMatcher = new HeaderMatcher(true);
headerMatcher.WithDurable(true);
headerMatcher.WithPriority((byte)1);
headerMatcher.WithTtl(65535);
headerMatcher.WithFirstAcquirer(true);
headerMatcher.WithDeliveryCount(2);
DataMatcher data1Matcher = new DataMatcher(new byte[] { 0, 1, 2, 3 }, true);
DataMatcher data2Matcher = new DataMatcher(new byte[] { 4, 5, 6, 7 }, true);
DataMatcher data3Matcher = new DataMatcher(new byte[] { 8, 9, 0, 1 });
TransferPayloadCompositeMatcher payloadMatcher = new TransferPayloadCompositeMatcher();
payloadMatcher.HeaderMatcher = headerMatcher;
payloadMatcher.AddMessageContentMatcher(data1Matcher);
payloadMatcher.AddMessageContentMatcher(data2Matcher);
payloadMatcher.AddMessageContentMatcher(data3Matcher);
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithMore(false).WithPayload(payloadMatcher).Accept();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
// Populate all Header values
Header header = new Header();
header.Durable = true;
header.Priority = (byte)1;
header.TimeToLive = 65535;
header.FirstAcquirer = true;
header.DeliveryCount = 2;
IList<ISection> sections = new List<ISection>();
sections.Add(new Data(new byte[] { 0, 1, 2, 3 }));
sections.Add(new Data(new byte[] { 4, 5, 6, 7 }));
sections.Add(new Data(new byte[] { 8, 9, 0, 1 }));
message.Header = header;
message.SetBodySections(sections);
message.Complete();
Assert.IsNotNull(message.Tracker);
Assert.IsTrue(message.Tracker.SettlementTask.IsCompleted);
Assert.IsTrue(message.Tracker.SettlementTask.Result.Settled);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestSendAndApplyDisposition()
{
DoTestSendAndApplyDisposition(false);
}
[Test]
public void TestSendAndApplyDispositionAsync()
{
DoTestSendAndApplyDisposition(true);
}
private void DoTestSendAndApplyDisposition(bool dispositionAsync)
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectBegin().Respond(); // Hidden session for stream sender
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithDeliveryCount(0)
.WithLinkCredit(10)
.WithIncomingWindow(1024)
.WithOutgoingWindow(10)
.WithNextIncomingId(0)
.WithNextOutgoingId(1).Queue();
peer.ExpectAttach().OfReceiver().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort);
ISession session = connection.OpenSession();
IStreamSender sender = (IStreamSender)connection.OpenStreamSender("test-queue").OpenTask.Result;
// This ensures that the flow to sender is processed before we try-send
IReceiver receiver = session.OpenReceiver("test-queue", new ReceiverOptions()
{
CreditWindow = 0
}
).OpenTask.Result;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithNonNullPayload();
peer.ExpectDisposition().WithSettled(true).WithState().Accepted();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
IMessage<string> message = IMessage<string>.Create("Hello World");
IStreamTracker tracker = sender.Send(message);
if (dispositionAsync)
{
Assert.DoesNotThrowAsync(async () => await tracker.DispositionAsync(IDeliveryState.Accepted(), true));
}
else
{
tracker.Disposition(IDeliveryState.Accepted(), true);
}
Assert.IsNotNull(tracker);
sender.Close();
connection.Close();
peer.WaitForScriptToComplete();
}
}
private static IDeliveryTagGenerator CustomTagGenerator()
{
return new CustomDeliveryTagGenerator();
}
private static IDeliveryTagGenerator CustomNullTagGenerator()
{
return null;
}
private class CustomDeliveryTagGenerator : IDeliveryTagGenerator
{
private int count = 1;
IDeliveryTag IDeliveryTagGenerator.NextTag()
{
switch (count++)
{
case 1:
return new DeliveryTag(new byte[] { 1, 1, 1 });
case 2:
return new DeliveryTag(new byte[] { 2, 2, 2 });
case 3:
return new DeliveryTag(new byte[] { 3, 3, 3 });
default:
throw new InvalidOperationException("Only supports creating three tags");
}
}
}
[Test]
public void TestSenderUsesCustomDeliveryTagGeneratorConfiguration()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.RemoteFlow().WithLinkCredit(10).Queue();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
StreamSenderOptions options = new StreamSenderOptions()
{
DeliveryMode = DeliveryMode.AtLeastOnce,
AutoSettle = true,
DeliveryTagGeneratorSupplier = CustomTagGenerator
};
IStreamSender sender = connection.OpenStreamSender("test-tags", options).OpenTask.Result;
peer.WaitForScriptToComplete();
peer.ExpectTransfer().WithNonNullPayload()
.WithDeliveryTag(new byte[] { 1, 1, 1 }).Respond().WithSettled(true).WithState().Accepted();
peer.ExpectTransfer().WithNonNullPayload()
.WithDeliveryTag(new byte[] { 2, 2, 2 }).Respond().WithSettled(true).WithState().Accepted();
peer.ExpectTransfer().WithNonNullPayload()
.WithDeliveryTag(new byte[] { 3, 3, 3 }).Respond().WithSettled(true).WithState().Accepted();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
IMessage<string> message = IMessage<string>.Create("Hello World");
IStreamTracker tracker1 = sender.Send(message);
IStreamTracker tracker2 = sender.Send(message);
IStreamTracker tracker3 = sender.Send(message);
Assert.IsNotNull(tracker1);
Assert.IsNotNull(tracker1.SettlementTask.Result);
Assert.IsNotNull(tracker2);
Assert.IsNotNull(tracker2.SettlementTask.Result);
Assert.IsNotNull(tracker3);
Assert.IsNotNull(tracker3.SettlementTask.Result);
sender.CloseAsync().Wait(TimeSpan.FromSeconds(10));
connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestCannotCreateSenderWhenTagGeneratorReturnsNull()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectClose().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
IConnection connection = container.Connect(remoteAddress, remotePort).OpenTask.Result;
StreamSenderOptions options = new StreamSenderOptions()
{
DeliveryMode = DeliveryMode.AtLeastOnce,
AutoSettle = true,
DeliveryTagGeneratorSupplier = CustomNullTagGenerator
};
try
{
_ = connection.OpenStreamSender("test-tags", options).OpenTask.Result;
Assert.Fail("Should not create a sender if the tag generator is not supplied");
}
catch (ClientException)
{
// Expected
}
connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
peer.WaitForScriptToComplete();
}
}
[Test]
public void TestSendTimesOutWhenNoCreditIssuedAndThenIssueCredit()
{
using (ProtonTestServer peer = new ProtonTestServer(loggerFactory))
{
peer.ExpectSASLAnonymousConnect();
peer.ExpectOpen().Respond();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.Start();
string remoteAddress = peer.ServerAddress;
int remotePort = peer.ServerPort;
logger.LogInformation("Test started, peer listening on: {0}:{1}", remoteAddress, remotePort);
IClient container = IClient.Create();
ConnectionOptions options = new ConnectionOptions()
{
SendTimeout = 10
};
IConnection connection = container.Connect(remoteAddress, remotePort, options);
IStreamSender sender = connection.OpenStreamSender("test-queue").OpenTask.Result;
IMessage<string> message = IMessage<string>.Create("Hello World");
try
{
sender.Send(message);
Assert.Fail("Should throw a send timed out exception");
}
catch (ClientSendTimedOutException)
{
// Expected error, ignore
}
peer.WaitForScriptToComplete();
peer.RemoteFlow().WithLinkCredit(1).Now();
peer.ExpectBegin().Respond();
peer.ExpectAttach().OfSender().Respond();
peer.ExpectTransfer().WithNonNullPayload();
peer.ExpectDetach().Respond();
peer.ExpectEnd().Respond();
peer.ExpectClose().Respond();
// Ensure the send happens after the remote has sent a flow with credit
_ = connection.OpenSender("test-queue-2").OpenTask.Result;
try
{
sender.Send(IMessage<string>.Create("Hello World 2"));
}
catch (ClientException ex)
{
logger.LogTrace("Error on second send: {0}", ex);
Assert.Fail("Should not throw an exception");
}
sender.CloseAsync().Wait(TimeSpan.FromSeconds(10));
connection.CloseAsync().Wait(TimeSpan.FromSeconds(10));
peer.WaitForScriptToComplete();
}
}
}
}