PROTON-2808 Account for zero copy buffers on ingest activity tracking
Update data ingest in the engine to account for the zero copy mechanics
in proton buffers resulting in missing the update to the incoming
sequence tracker and early read check idle timeout errors.
diff --git a/src/Proton/Engine/Implementation/ProtonEngine.cs b/src/Proton/Engine/Implementation/ProtonEngine.cs
index 88ecd39..953dcf0 100644
--- a/src/Proton/Engine/Implementation/ProtonEngine.cs
+++ b/src/Proton/Engine/Implementation/ProtonEngine.cs
@@ -231,19 +231,21 @@
throw new EngineNotWritableException("Engine is currently not accepting new input");
}
- try
+ if (input.IsReadable)
{
- long startIndex = input.ReadOffset;
- pipeline.FireRead(input);
- if (input.ReadOffset != startIndex)
+ try
+ {
+ pipeline.FireRead(input);
+ }
+ catch (Exception error)
+ {
+ throw EngineFailed(error);
+ }
+ finally
{
inputSequence++;
}
}
- catch (Exception error)
- {
- throw EngineFailed(error);
- }
return this;
}
diff --git a/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs b/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs
index a24192e..3f2d0b6 100644
--- a/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs
+++ b/test/Proton.Tests/Engine/Implementation/ProtonEngineTest.cs
@@ -1452,5 +1452,75 @@
peer.WaitForScriptToComplete();
}
+
+ [Test]
+ public void TestSlowFrameCoalesceDoesNotTriggerReadIdleTimeout()
+ {
+ // Frame data for: Transfer
+ // Transfer{handle=0, deliveryId=1, deliveryTag=\x00\x01, messageFormat=null, settled=true, more=false,
+ // rcvSettleMode=null, state=null, resume=false, aborted=false, batchable=false}
+ // payload of size: 169
+ byte[] completedTransfer1 = new byte[] {
+ 0, 0, 0, 193, 2, 0, 0, 0, 0, 83, 20, 192, 11, 5, 82, 0, 82, 1, 160, 2, 0, 1, 64, 65, 0, 83, 115,
+ 208, 0, 0, 0, 28, 0, 0, 0, 3, 152, 149, 181, 19, 123, 103, 50, 77, 43, 183, 93, 29, 105, 64};
+ byte[] completedTransfer2 = new byte[] {
+ 172, 45, 110, 64, 161, 4, 116, 101, 115, 116, 0, 83, 116, 193, 23, 2, 161, 9, 116, 105, 109, 101,
+ 115, 116, 97, 109, 112, 161, 9, 49, 50, 51, 52, 53, 54, 55, 56, 57, 0, 83, 117, 160, 100, 65, 65};
+ byte[] completedTransfer3 = new byte[] {
+ 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65,
+ 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65,
+ 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65,
+ 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65};
+
+ IEngine engine = IEngineFactory.Proton.CreateNonSaslEngine();
+ engine.ErrorHandler((error) => failure = error.FailureCause);
+ ProtonTestConnector peer = CreateTestPeer(engine);
+
+ peer.ExpectAMQPHeader().RespondWithAMQPHeader();
+ peer.ExpectOpen().WithIdleTimeOut(1000).Respond().WithIdleTimeOut(0);
+ peer.ExpectBegin().Respond();
+ peer.ExpectAttach().Respond();
+ peer.ExpectFlow();
+
+ IConnection connection = engine.Start();
+ connection.IdleTimeout = 1000;
+ connection.Open();
+ ISession session = connection.Session().Open();
+ IReceiver receiver = session.Receiver("test").Open().AddCredit(10);
+
+ bool deliveryArrived = false;
+ IIncomingDelivery receivedDelivery = null;
+ receiver.DeliveryReadHandler((delivery) =>
+ {
+ deliveryArrived = true;
+ receivedDelivery = delivery;
+ });
+
+ // Initial tick sets first deadline
+ Assert.AreEqual(2000, connection.Tick(1000));
+
+ peer.RemoteBytes().WithBytes(completedTransfer1).Now();
+ Assert.AreEqual(2500, connection.Tick(1500));
+
+ peer.RemoteBytes().WithBytes(completedTransfer2).Now();
+ Assert.AreEqual(3000, connection.Tick(2000));
+
+ peer.RemoteBytes().WithBytes(completedTransfer3).Now();
+ Assert.AreEqual(3500, connection.Tick(2500));
+
+ peer.WaitForScriptToComplete();
+ peer.ExpectDetach().Respond();
+ peer.ExpectEnd().Respond();
+
+ Assert.IsTrue(deliveryArrived);
+ Assert.IsNotNull(receivedDelivery);
+
+ receiver.Detach();
+ session.Close();
+
+ peer.WaitForScriptToComplete();
+
+ Assert.IsNull(failure);
+ }
}
}
\ No newline at end of file