| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using Amqp.Framing; |
| using Amqp.Types; |
| using Apache.NMS; |
| using Apache.NMS.AMQP.Util; |
| using NMS.AMQP.Test.TestAmqp; |
| using NUnit.Framework; |
| |
| namespace NMS.AMQP.Test.Integration |
| { |
| // Adapted from ProducerIntegrationTest to use NMSContext |
| [TestFixture] |
| public class NMSProducerIntegrationTest : IntegrationTestFixture |
| { |
| private const long TICKS_PER_MILLISECOND = 10000; |
| |
| [Test, Timeout(20_000)] |
| public void TestCloseSender() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = base.EstablishNMSContext(testPeer); |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue queue = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| producer.Close(); |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSentTextMessageCanBeModified() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = base.EstablishNMSContext(testPeer); |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue queue = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| // Create and transfer a new message |
| String text = "myMessage"; |
| testPeer.ExpectTransfer(x => Assert.AreEqual(text, (x.BodySection as AmqpValue).Value)); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| ITextMessage message = context.CreateTextMessage(text); |
| producer.Send(queue, message); |
| |
| Assert.AreEqual(text, message.Text); |
| message.Text = text + text; |
| Assert.AreEqual(text + text, message.Text); |
| |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestDefaultDeliveryModeProducesDurableMessages() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = base.EstablishNMSContext(testPeer); |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue queue = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| // Create and transfer a new message |
| testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable)); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| ITextMessage textMessage = context.CreateTextMessage(); |
| |
| producer.Send(queue, textMessage); |
| Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode); |
| |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestProducerOverridesMessageDeliveryMode() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = base.EstablishNMSContext(testPeer); |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue queue = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| // Create and transfer a new message, explicitly setting the deliveryMode on the |
| // message (which applications shouldn't) to NON_PERSISTENT and sending it to check |
| // that the producer ignores this value and sends the message as PERSISTENT(/durable) |
| testPeer.ExpectTransfer(message => Assert.IsTrue(message.Header.Durable)); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| ITextMessage textMessage = context.CreateTextMessage(); |
| textMessage.NMSDeliveryMode = MsgDeliveryMode.NonPersistent; |
| Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode); |
| |
| producer.Send(queue, textMessage); |
| |
| Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode); |
| |
| context.Close(); |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendingMessageNonPersistentProducerSetDurableFalse() |
| { |
| DoSendingMessageNonPersistentTestImpl(true); |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendingMessageNonPersistentProducerOmitsHeader() |
| { |
| DoSendingMessageNonPersistentTestImpl(false); |
| } |
| |
| private void DoSendingMessageNonPersistentTestImpl(bool setPriority) |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| //Add capability to indicate support for ANONYMOUS-RELAY |
| Symbol[] serverCapabilities = {SymbolUtil.OPEN_CAPABILITY_ANONYMOUS_RELAY}; |
| var context = EstablishNMSContext(testPeer, serverCapabilities: serverCapabilities); |
| testPeer.ExpectBegin(); |
| |
| string queueName = "myQueue"; |
| Action<object> targetMatcher = t => |
| { |
| var target = t as Target; |
| Assert.IsNotNull(target); |
| }; |
| |
| |
| testPeer.ExpectSenderAttach(targetMatcher: targetMatcher, sourceMatcher: Assert.NotNull, senderSettled: false); |
| |
| IQueue queue = context.GetQueue(queueName); |
| INMSProducer producer = context.CreateProducer(); |
| |
| byte priority = 5; |
| String text = "myMessage"; |
| testPeer.ExpectTransfer(messageMatcher: message => |
| { |
| if (setPriority) |
| { |
| Assert.IsFalse(message.Header.Durable); |
| Assert.AreEqual(priority, message.Header.Priority); |
| } |
| |
| Assert.AreEqual(text, (message.BodySection as AmqpValue).Value); |
| }, stateMatcher: Assert.IsNull, |
| settled: false, |
| sendResponseDisposition: true, |
| responseState: new Accepted(), |
| responseSettled: true); |
| |
| ITextMessage textMessage = context.CreateTextMessage(text); |
| |
| producer.DeliveryMode = MsgDeliveryMode.NonPersistent; |
| if (setPriority) |
| producer.Priority = (MsgPriority) priority; |
| |
| producer.Send(queue, textMessage); |
| |
| Assert.AreEqual(MsgDeliveryMode.NonPersistent, textMessage.NMSDeliveryMode, "Should have NonPersistent delivery mode set"); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendingMessageSetsNMSDestination() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer); |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue destination = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| string text = "myMessage"; |
| ITextMessage message = context.CreateTextMessage(text); |
| |
| testPeer.ExpectTransfer(m => Assert.AreEqual(text, (m.BodySection as AmqpValue).Value)); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| Assert.IsNull(message.NMSDestination, "Should not yet have a NMSDestination"); |
| |
| producer.Send(destination, message); |
| |
| Assert.AreEqual(destination, message.NMSDestination, "Should have had NMSDestination set"); |
| |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendingMessageSetsNMSTimestamp() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer); |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue destination = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| // Create matcher to expect the absolute-expiry-time field of the properties section to |
| // be set to a value greater than 'now'+ttl, within a delta. |
| |
| DateTime creationLower = DateTime.UtcNow; |
| DateTime creationUpper = creationLower + TimeSpan.FromMilliseconds(3000); |
| |
| var text = "myMessage"; |
| testPeer.ExpectTransfer(m => |
| { |
| Assert.IsTrue(m.Header.Durable); |
| Assert.That(m.Properties.CreationTime.Ticks, Is.GreaterThanOrEqualTo(creationLower.Ticks).Within(TICKS_PER_MILLISECOND)); |
| Assert.That(m.Properties.CreationTime.Ticks, Is.LessThanOrEqualTo(creationUpper.Ticks).Within(TICKS_PER_MILLISECOND)); |
| Assert.AreEqual(text, (m.BodySection as AmqpValue).Value); |
| }); |
| |
| ITextMessage message = context.CreateTextMessage(text); |
| producer.Send(destination, message); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendingMessageSetsNMSExpirationRelatedAbsoluteExpiryAndTtlFields() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer); |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue destination = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| uint ttl = 100_000; |
| DateTime currentTime = DateTime.UtcNow; |
| DateTime expirationLower = currentTime + TimeSpan.FromMilliseconds(ttl); |
| DateTime expirationUpper = currentTime + TimeSpan.FromMilliseconds(ttl) + TimeSpan.FromMilliseconds(5000); |
| |
| // Create matcher to expect the absolute-expiry-time field of the properties section to |
| // be set to a value greater than 'now'+ttl, within a delta. |
| string text = "myMessage"; |
| testPeer.ExpectTransfer(m => |
| { |
| Assert.IsTrue(m.Header.Durable); |
| Assert.AreEqual(ttl, m.Header.Ttl); |
| Assert.That(m.Properties.AbsoluteExpiryTime.Ticks, Is.GreaterThanOrEqualTo(expirationLower.Ticks).Within(TICKS_PER_MILLISECOND)); |
| Assert.That(m.Properties.AbsoluteExpiryTime.Ticks, Is.LessThanOrEqualTo(expirationUpper.Ticks).Within(TICKS_PER_MILLISECOND)); |
| Assert.AreEqual(text, (m.BodySection as AmqpValue).Value); |
| }); |
| |
| ITextMessage message = context.CreateTextMessage(text); |
| producer.TimeToLive = TimeSpan.FromMilliseconds(ttl); |
| producer.Priority = NMSConstants.defaultPriority; |
| producer.DeliveryMode = NMSConstants.defaultDeliveryMode; |
| producer.Send(destination, message); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestMessagesAreProducedWithProperDefaultPriorityWhenNoPrioritySpecified() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer); |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue destination = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| byte priority = 4; |
| |
| testPeer.ExpectTransfer(m => Assert.AreEqual(priority, m.Header.Priority)); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| ITextMessage message = context.CreateTextMessage(); |
| Assert.AreEqual(MsgPriority.BelowNormal, message.NMSPriority); |
| |
| producer.Send(destination, message); |
| |
| Assert.AreEqual((MsgPriority) priority, message.NMSPriority); |
| |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestNonDefaultPriorityProducesMessagesWithPriorityFieldAndSetsNMSPriority() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer); |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue destination = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| byte priority = 9; |
| |
| testPeer.ExpectTransfer(m => Assert.AreEqual(priority, m.Header.Priority)); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| ITextMessage message = context.CreateTextMessage(); |
| Assert.AreEqual(MsgPriority.BelowNormal, message.NMSPriority); |
| |
| producer.DeliveryMode = MsgDeliveryMode.Persistent; |
| producer.Priority = (MsgPriority) priority; |
| producer.TimeToLive = NMSConstants.defaultTimeToLive; |
| producer.Send(destination, message); |
| |
| Assert.AreEqual((MsgPriority) priority, message.NMSPriority); |
| |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendingMessageSetsNMSMessageId() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer); |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue destination = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| string text = "myMessage"; |
| string actualMessageId = null; |
| testPeer.ExpectTransfer(m => |
| { |
| Assert.IsTrue(m.Header.Durable); |
| Assert.IsNotEmpty(m.Properties.MessageId); |
| actualMessageId = m.Properties.MessageId; |
| }); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| ITextMessage message = context.CreateTextMessage(text); |
| Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set"); |
| |
| producer.Send(destination, message); |
| |
| Assert.IsNotNull(message.NMSMessageId); |
| Assert.IsNotEmpty(message.NMSMessageId, "NMSMessageId should be set"); |
| Assert.IsTrue(message.NMSMessageId.StartsWith("ID:"), "MMS 'ID:' prefix not found"); |
| |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| // Get the value that was actually transmitted/received, verify it is a string, compare to what we have locally |
| Assert.AreEqual(message.NMSMessageId, actualMessageId, "Expected NMSMessageId value to be present in AMQP message"); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendingMessageWithDisableMessageIdHint() |
| { |
| DoSendingMessageWithDisableMessageIdHintTestImpl(false); |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendingMessageWithDisableMessageIdHintAndExistingMessageId() |
| { |
| DoSendingMessageWithDisableMessageIdHintTestImpl(true); |
| } |
| |
| private void DoSendingMessageWithDisableMessageIdHintTestImpl(bool existingId) |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer); |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue destination = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| string text = "myMessage"; |
| testPeer.ExpectTransfer(m => |
| { |
| Assert.IsTrue(m.Header.Durable); |
| Assert.IsNull(m.Properties.MessageId); // Check there is no message-id value; |
| Assert.AreEqual(text, (m.BodySection as AmqpValue).Value); |
| }); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| ITextMessage message = context.CreateTextMessage(text); |
| |
| Assert.IsNull(message.NMSMessageId, "NMSMessageId should not yet be set"); |
| |
| if (existingId) |
| { |
| string existingMessageId = "ID:this-should-be-overwritten-in-send"; |
| message.NMSMessageId = existingMessageId; |
| Assert.AreEqual(existingMessageId, message.NMSMessageId, "NMSMessageId should now be se"); |
| } |
| |
| producer.DisableMessageID = true; |
| |
| producer.Send(destination, message); |
| |
| Assert.IsNull(message.NMSMessageId, "NMSMessageID should be null"); |
| |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(2000); |
| } |
| } |
| |
| // TODO No connection listener in nms context |
| // [Test, Timeout(20_000)] |
| // public void TestRemotelyCloseProducer() |
| // { |
| // string breadCrumb = "ErrorMessageBreadCrumb"; |
| // |
| // ManualResetEvent producerClosed = new ManualResetEvent(false); |
| // Mock<INmsConnectionListener> mockConnectionListener = new Mock<INmsConnectionListener>(); |
| // mockConnectionListener |
| // .Setup(listener => listener.OnProducerClosed(It.IsAny<NmsMessageProducer>(), It.IsAny<Exception>())) |
| // .Callback(() => { producerClosed.Set(); }); |
| // |
| // using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| // { |
| // NmsContext context = (NmsContext) EstablishNMSContext(testPeer); |
| // context.AddConnectionListener(mockConnectionListener.Object); |
| // |
| // testPeer.ExpectBegin(); |
| // ISession session = context.CreateSession(AcknowledgementMode.AutoAcknowledge); |
| // |
| // // Create a producer, then remotely end it afterwards. |
| // testPeer.ExpectSenderAttach(); |
| // testPeer.RemotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse: true, closed: true, errorType: AmqpError.RESOURCE_DELETED, breadCrumb, delayBeforeSend: 10); |
| // |
| // IQueue destination = session.GetQueue("myQueue"); |
| // IMessageProducer producer = session.CreateProducer(destination); |
| // |
| // // Verify the producer gets marked closed |
| // testPeer.WaitForAllMatchersToComplete(1000); |
| // |
| // Assert.True(producerClosed.WaitOne(TimeSpan.FromMilliseconds(1000)), "Producer closed callback didn't trigger"); |
| // Assert.That(() => producer.DisableMessageID, Throws.Exception.InstanceOf<IllegalStateException>(), "Producer never closed"); |
| // |
| // // Try closing it explicitly, should effectively no-op in client. |
| // // The test peer will throw during close if it sends anything. |
| // producer.Close(); |
| // } |
| // } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendWhenLinkCreditIsZeroAndTimeout() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer, optionsString: "nms.sendTimeout=500"); |
| testPeer.ExpectBegin(); |
| |
| IQueue queue = context.GetQueue("myQueue"); |
| |
| ITextMessage message = context.CreateTextMessage("text"); |
| |
| // Expect the producer to attach. Don't send any credit so that the client will |
| // block on a send and we can test our timeouts. |
| testPeer.ExpectSenderAttachWithoutGrantingCredit(); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| var producer = context.CreateProducer(); |
| |
| Assert.Catch<Exception>(() => producer.Send(queue, message), "Send should time out."); |
| |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendTimesOutWhenNoDispositionArrives() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer, optionsString: "nms.sendTimeout=500"); |
| testPeer.ExpectBegin(); |
| |
| IQueue queue = context.GetQueue("myQueue"); |
| |
| ITextMessage message = context.CreateTextMessage("text"); |
| |
| // Expect the producer to attach and grant it some credit, it should send |
| // a transfer which we will not send any response for which should cause the |
| // send operation to time out. |
| testPeer.ExpectSenderAttach(); |
| testPeer.ExpectTransferButDoNotRespond(messageMatcher: Assert.NotNull); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| var producer = context.CreateProducer(); |
| |
| Assert.Catch<Exception>(() => producer.Send(queue, message), "Send should time out."); |
| |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendWorksWhenConnectionNotStarted() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer); |
| |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue destination = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| testPeer.ExpectTransfer(Assert.IsNotNull); |
| |
| producer.Send(destination, context.CreateMessage()); |
| |
| testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true); |
| producer.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendWorksAfterConnectionStopped() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer); |
| context.Start(); |
| |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue destination = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| |
| testPeer.ExpectTransfer(Assert.IsNotNull); |
| |
| context.Stop(); |
| |
| producer.Send(destination, context.CreateMessage()); |
| |
| testPeer.ExpectDetach(expectClosed: true, sendResponse: true, replyClosed: true); |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| |
| producer.Close(); |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendingMessagePersistentSetsBatchableFalse() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer); |
| context.Start(); |
| |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue destination = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull, |
| stateMatcher: Assert.IsNull, |
| settled: false, |
| sendResponseDisposition: true, |
| responseState: new Accepted(), |
| responseSettled: true, |
| batchable: false); |
| |
| IMessage message = context.CreateMessage(); |
| producer.DeliveryMode = MsgDeliveryMode.Persistent; |
| producer.Priority = MsgPriority.Normal; |
| producer.TimeToLive = NMSConstants.defaultTimeToLive; |
| producer.Send(destination, message); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| |
| [Test, Timeout(20_000)] |
| public void TestSendingMessageNonPersistentSetsBatchableFalse() |
| { |
| using (TestAmqpPeer testPeer = new TestAmqpPeer()) |
| { |
| var context = EstablishNMSContext(testPeer); |
| context.Start(); |
| |
| testPeer.ExpectBegin(); |
| testPeer.ExpectSenderAttach(); |
| |
| IQueue destination = context.GetQueue("myQueue"); |
| var producer = context.CreateProducer(); |
| testPeer.ExpectTransfer(messageMatcher: Assert.IsNotNull, |
| stateMatcher: Assert.IsNull, |
| settled: false, |
| sendResponseDisposition: true, |
| responseState: new Accepted(), |
| responseSettled: true, |
| batchable: false); |
| |
| IMessage message = context.CreateMessage(); |
| producer.DeliveryMode = MsgDeliveryMode.NonPersistent; |
| producer.Priority = MsgPriority.Normal; |
| producer.TimeToLive = NMSConstants.defaultTimeToLive; |
| producer.Send(destination, message); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| |
| testPeer.ExpectEnd(); |
| testPeer.ExpectClose(); |
| context.Close(); |
| |
| testPeer.WaitForAllMatchersToComplete(1000); |
| } |
| } |
| } |
| } |