AMQNET-637 Deliverytime
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
index 2a7f918..4799ada 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
@@ -38,7 +38,7 @@
private IDestination consumerDestination;
private IAmqpConnection connection;
private DateTime? syntheticExpiration;
- // private DateTime syntheticDeliveryTime;
+ private DateTime syntheticDeliveryTime;
public global::Amqp.Message Message { get; private set; }
public int RedeliveryCount
@@ -269,14 +269,15 @@
case ulong _:
case int _:
case uint _:
- return new DateTime(621355968000000000L + (long) deliveryTime * 10000L, DateTimeKind.Utc);
+ return new DateTime(621355968000000000L + Convert.ToInt64(deliveryTime) * 10000L, DateTimeKind.Utc);
default:
- return DateTime.UtcNow; // syntheticDeliveryTime;
+ return syntheticDeliveryTime;
}
}
set
{
- // syntheticDeliveryTime = value;
+ // Assumption that if it is being set through property, then it is with purpose of send out this value
+ syntheticDeliveryTime = value;
SetMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME, new DateTimeOffset(value).ToUnixTimeMilliseconds());
}
}
@@ -423,10 +424,10 @@
syntheticExpiration = DateTime.UtcNow + ttl;
}
- // if (GetMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME) == null)
- // {
- // syntheticDeliveryTime = DateTime.UtcNow;
- // }
+ if (GetMessageAnnotation(SymbolUtil.NMS_DELIVERY_TIME) == null)
+ {
+ syntheticDeliveryTime = DateTime.UtcNow;
+ }
}
@@ -488,7 +489,7 @@
target.connection = connection;
target.consumerDestination = consumerDestination;
target.syntheticExpiration = syntheticExpiration;
- // target.syntheticDeliveryTime = syntheticDeliveryTime;
+ target.syntheticDeliveryTime = syntheticDeliveryTime;
target.amqpTimeToLiveOverride = amqpTimeToLiveOverride;
target.destination = destination;
target.replyTo = replyTo;
diff --git a/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs b/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs
new file mode 100644
index 0000000..dbe936a
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Integration/MessageDeliveryTimeTest.cs
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Amqp.Framing;
+using Amqp.Types;
+using Apache.NMS;
+using Apache.NMS.AMQP.Util;
+using NMS.AMQP.Test.TestAmqp;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Integration
+{
+ [TestFixture]
+ public class MessageDeliveryTimeTest : IntegrationTestFixture
+ {
+ [Test, Timeout(20000)]
+ public void TestReceiveMessageWithoutDeliveryTimeSet()
+ {
+ DoReceiveMessageDeliveryTime(null, null);
+ }
+
+ [Test, Timeout(20000)]
+ public void TestDeliveryTimeIsDateTime()
+ {
+ DateTime deliveryTime = DateTimeOffset.FromUnixTimeMilliseconds(CurrentTimeInMillis() + 12345).DateTime.ToUniversalTime();
+ DoReceiveMessageDeliveryTime(deliveryTime, deliveryTime);
+ }
+
+ [Test, Timeout(20000)]
+ public void TestDeliveryTimeIsULong()
+ {
+ ulong deliveryTime = (ulong) (CurrentTimeInMillis() + 12345);
+ DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds((long) deliveryTime).DateTime);
+ }
+
+ [Test, Timeout(20000)]
+ public void TestDeliveryTimeIsLong()
+ {
+ long deliveryTime = (CurrentTimeInMillis() + 12345);
+ DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+ }
+
+ [Test, Timeout(20000)]
+ public void TestDeliveryTimeIsInt()
+ {
+ int deliveryTime = (int) (CurrentTimeInMillis() + 12345);
+ DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+ }
+
+ [Test, Timeout(20000)]
+ public void TestDeliveryTimeIsUInt()
+ {
+ uint deliveryTime = (uint) (CurrentTimeInMillis() + 12345);
+ DoReceiveMessageDeliveryTime(deliveryTime, DateTimeOffset.FromUnixTimeMilliseconds(deliveryTime).DateTime);
+ }
+
+ private long CurrentTimeInMillis()
+ {
+ return new DateTimeOffset(DateTime.UtcNow).ToUnixTimeMilliseconds();
+ }
+
+ private void DoReceiveMessageDeliveryTime(object setDeliveryTimeAnnotation, DateTime? expectedDeliveryTime)
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ var connection = EstablishConnection(testPeer, "amqp.traceFrames=true");
+ connection.Start();
+ testPeer.ExpectBegin();
+ var session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ var queue = session.GetQueue("myQueue");
+
+ var message = CreateMessageWithNullContent();
+ if (setDeliveryTimeAnnotation != null)
+ {
+ message.MessageAnnotations = message.MessageAnnotations ?? new MessageAnnotations();
+ message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME] = setDeliveryTimeAnnotation;
+ }
+
+ testPeer.ExpectReceiverAttach();
+ testPeer.ExpectLinkFlowRespondWithTransfer(message);
+ testPeer.ExpectDisposition(true, (deliveryState) => { });
+
+ DateTime startingTimeFrom = DateTime.UtcNow;
+ var messageConsumer = session.CreateConsumer(queue);
+ var receivedMessage = messageConsumer.Receive(TimeSpan.FromMilliseconds(3000));
+ DateTime receivingTime = DateTime.UtcNow;
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+
+ testPeer.ExpectClose();
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(3000);
+
+ Assert.IsNotNull(receivedMessage);
+ if (expectedDeliveryTime != null)
+ {
+ Assert.AreEqual(receivedMessage.NMSDeliveryTime, expectedDeliveryTime.Value);
+ }
+ else
+ {
+ Assert.LessOrEqual(receivedMessage.NMSDeliveryTime, receivingTime);
+ Assert.GreaterOrEqual(receivedMessage.NMSDeliveryTime, startingTimeFrom);
+ }
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestDeliveryDelayNotSupportedThrowsException()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ IConnection connection = base.EstablishConnection(testPeer);
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageProducer producer = session.CreateProducer(queue);
+ Assert.Throws<NotSupportedException>(() => producer.DeliveryDelay = TimeSpan.FromMinutes(17));
+ }
+ }
+
+ [Test, Timeout(20_000)]
+ public void TestDeliveryDelayHasItsReflectionInAmqpAnnotations()
+ {
+ using (TestAmqpPeer testPeer = new TestAmqpPeer())
+ {
+ // Determine current time
+ TimeSpan deliveryDelay = TimeSpan.FromMinutes(17);
+ long currentUnixEpochTime = new DateTimeOffset(DateTime.UtcNow + deliveryDelay).ToUnixTimeMilliseconds();
+ long currentUnixEpochTime2 = new DateTimeOffset(DateTime.UtcNow + deliveryDelay + deliveryDelay).ToUnixTimeMilliseconds();
+
+ IConnection connection = base.EstablishConnection(testPeer,
+ serverCapabilities: new Symbol[] {SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY, SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER});
+ testPeer.ExpectBegin();
+ testPeer.ExpectSenderAttach();
+
+
+ ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
+ IQueue queue = session.GetQueue("myQueue");
+ IMessageProducer producer = session.CreateProducer(queue);
+ producer.DeliveryDelay = deliveryDelay;
+
+ // Create and transfer a new message
+ testPeer.ExpectTransfer(message =>
+ {
+ Assert.GreaterOrEqual((long) message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME], currentUnixEpochTime);
+ Assert.Less((long) message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME], currentUnixEpochTime2);
+
+ Assert.IsTrue(message.Header.Durable);
+ });
+ testPeer.ExpectClose();
+
+ ITextMessage textMessage = session.CreateTextMessage();
+
+ producer.Send(textMessage);
+ Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
+
+ connection.Close();
+
+ testPeer.WaitForAllMatchersToComplete(1000);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Integration/ProducerDeliveryDelayTest.cs b/test/Apache-NMS-AMQP-Test/Integration/ProducerDeliveryDelayTest.cs
deleted file mode 100644
index da66590..0000000
--- a/test/Apache-NMS-AMQP-Test/Integration/ProducerDeliveryDelayTest.cs
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using Amqp.Types;
-using Apache.NMS;
-using Apache.NMS.AMQP.Util;
-using NMS.AMQP.Test.TestAmqp;
-using NUnit.Framework;
-
-namespace NMS.AMQP.Test.Integration
-{
- [TestFixture]
- public class ProducerDeliveryDelayTest : IntegrationTestFixture
- {
- [Test, Timeout(20_000)]
- public void TestDeliveryDelayNotSupportedThrowsException()
- {
- using (TestAmqpPeer testPeer = new TestAmqpPeer())
- {
- IConnection connection = base.EstablishConnection(testPeer);
- testPeer.ExpectBegin();
- testPeer.ExpectSenderAttach();
-
- ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
- IQueue queue = session.GetQueue("myQueue");
- IMessageProducer producer = session.CreateProducer(queue);
- Assert.Throws<NotSupportedException>(() => producer.DeliveryDelay = TimeSpan.FromMinutes(17));
- }
- }
-
- [Test, Timeout(20_000)]
- public void TestDeliveryDelayHasItsReflectionInAmqpAnnotations()
- {
- using (TestAmqpPeer testPeer = new TestAmqpPeer())
- {
- // Determine current time
- TimeSpan deliveryDelay = TimeSpan.FromMinutes(17);
- long currentUnixEpochTime = new DateTimeOffset(DateTime.UtcNow + deliveryDelay).ToUnixTimeMilliseconds();
- long currentUnixEpochTime2 = new DateTimeOffset(DateTime.UtcNow + deliveryDelay + deliveryDelay).ToUnixTimeMilliseconds();
-
- IConnection connection = base.EstablishConnection(testPeer,
- serverCapabilities: new Symbol[] {SymbolUtil.OPEN_CAPABILITY_DELAYED_DELIVERY, SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER});
- testPeer.ExpectBegin();
- testPeer.ExpectSenderAttach();
-
-
- ISession session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
- IQueue queue = session.GetQueue("myQueue");
- IMessageProducer producer = session.CreateProducer(queue);
- producer.DeliveryDelay = deliveryDelay;
-
- // Create and transfer a new message
- testPeer.ExpectTransfer(message =>
- {
- Assert.GreaterOrEqual((long) message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME], currentUnixEpochTime);
- Assert.Less((long) message.MessageAnnotations[SymbolUtil.NMS_DELIVERY_TIME], currentUnixEpochTime2);
-
- Assert.IsTrue(message.Header.Durable);
- });
- testPeer.ExpectClose();
-
- ITextMessage textMessage = session.CreateTextMessage();
-
- producer.Send(textMessage);
- Assert.AreEqual(MsgDeliveryMode.Persistent, textMessage.NMSDeliveryMode);
-
- connection.Close();
-
- testPeer.WaitForAllMatchersToComplete(1000);
- }
- }
- }
-}
\ No newline at end of file