PROTON-1583: handle edge cases when driving transport#tick(now) with nanoTime derived values
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 5441dec..33e4f87 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -1554,15 +1554,14 @@
@Override
public long tick(long now)
{
- long timeout = 0;
+ long deadline = 0;
if (_localIdleTimeout > 0) {
if (_localIdleDeadline == 0 || _lastBytesInput != _bytesInput) {
- _localIdleDeadline = now + _localIdleTimeout;
+ _localIdleDeadline = computeDeadline(now, _localIdleTimeout);
_lastBytesInput = _bytesInput;
- } else if (_localIdleDeadline <= now) {
- _localIdleDeadline = now + _localIdleTimeout;
-
+ } else if (_localIdleDeadline - now <= 0) {
+ _localIdleDeadline = computeDeadline(now, _localIdleTimeout);
if (_connectionEndpoint != null &&
_connectionEndpoint.getLocalState() != EndpointState.CLOSED) {
ErrorCondition condition =
@@ -1588,24 +1587,41 @@
close_tail();
}
}
- timeout = _localIdleDeadline;
+ deadline = _localIdleDeadline;
}
if (_remoteIdleTimeout != 0 && !_isCloseSent) {
if (_remoteIdleDeadline == 0 || _lastBytesOutput != _bytesOutput) {
- _remoteIdleDeadline = now + (_remoteIdleTimeout / 2);
+ _remoteIdleDeadline = computeDeadline(now, _remoteIdleTimeout / 2);
_lastBytesOutput = _bytesOutput;
- } else if (_remoteIdleDeadline <= now) {
- _remoteIdleDeadline = now + (_remoteIdleTimeout / 2);
+ } else if (_remoteIdleDeadline - now <= 0) {
+ _remoteIdleDeadline = computeDeadline(now, _remoteIdleTimeout / 2);
if (pending() == 0) {
writeFrame(0, null, null, null);
_lastBytesOutput += pending();
}
}
- timeout = Math.min(timeout == 0 ? _remoteIdleDeadline : timeout, _remoteIdleDeadline);
+
+ if(deadline == 0) {
+ deadline = _remoteIdleDeadline;
+ } else {
+ if(_remoteIdleDeadline - _localIdleDeadline <= 0) {
+ deadline = _remoteIdleDeadline;
+ } else {
+ deadline = _localIdleDeadline;
+ }
+ }
}
- return timeout;
+ return deadline;
+ }
+
+ private long computeDeadline(long now, long timeout) {
+ long deadline = now + timeout;
+
+ // We use 0 to signal not-initialised and/or no-timeout, so in the
+ // unlikely event thats to be the actual deadline, return 1 instead
+ return deadline != 0 ? deadline : 1;
}
@Override
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index f6ca12c..80ad20c 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -362,15 +362,7 @@
// Protocol header + empty frame
ByteBuffer data = ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00});
- while (data.remaining() > 0)
- {
- int origLimit = data.limit();
- int amount = Math.min(transport.tail().remaining(), data.remaining());
- data.limit(data.position() + amount);
- transport.tail().put(data);
- data.limit(origLimit);
- transport.process();
- }
+ processInput(transport, data);
framesWrittenBeforeTick = transport.writes.size();
deadline = transport.tick(2000);
assertEquals("Reading data data resets the deadline", 6000, deadline);
@@ -1667,4 +1659,488 @@
assertEquals("Unexpected value : " + getFrameTypesWritten(transport), 4567, transport.getRemoteMaxFrameSize());
}
+
+ @Test
+ public void testTickWithZeroIdleTimeoutsGivesZeroDeadline()
+ {
+ doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(true);
+ }
+
+ @Test
+ public void testTickWithNullIdleTimeoutsGivesZeroDeadline()
+ {
+ doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(false);
+ }
+
+ private void doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(boolean useZero) {
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ connection.open();
+ while(transport.pending() > 0) {
+ transport.pop(transport.head().remaining());
+ }
+
+ assertEquals("should have written data", 1, transport.writes.size());
+ FrameBody sentOpenFrame = transport.writes.get(0);
+ assertNotNull("should have written a non-empty frame", sentOpenFrame);
+ assertTrue("should have written an open frame", sentOpenFrame instanceof Open);
+ assertNull("should not have had an idletimeout value", ((Open)sentOpenFrame).getIdleTimeOut());
+
+ // Handle the peer transmitting their open with null/zero timeout.
+ Open open = new Open();
+ if(useZero) {
+ open.setIdleTimeOut(UnsignedInteger.ZERO);
+ } else {
+ open.setIdleTimeOut(null);
+ }
+ TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+ transport.handleFrame(openFrame);
+ pumpMockTransport(transport);
+
+ long deadline = transport.tick(0);
+ assertEquals("Unexpected deadline returned", 0, deadline);
+
+ deadline = transport.tick(10);
+ assertEquals("Unexpected deadline returned", 0, deadline);
+ }
+
+ @Test
+ public void testTickWithBothTimeouts()
+ {
+ // all-positive
+ doTickWithBothTimeoutsTestImpl(true, 5000, 2000, 10000, 12000, 14000, 15000);
+ doTickWithBothTimeoutsTestImpl(false, 5000, 2000, 10000, 12000, 14000, 15000);
+
+ // all-negative
+ doTickWithBothTimeoutsTestImpl(true, 10000, 4000, -100000, -96000, -92000, -90000);
+ doTickWithBothTimeoutsTestImpl(false, 10000, 4000, -100000, -96000, -92000, -90000);
+
+ // negative to positive missing 0
+ doTickWithBothTimeoutsTestImpl(true, 500, 200, -450, -250, -50, 50);
+ doTickWithBothTimeoutsTestImpl(false, 500, 200, -450, -250, -50, 50);
+
+ // negative to positive striking 0 with local deadline
+ doTickWithBothTimeoutsTestImpl(true, 500, 200, -500, -300, -100, 1);
+ doTickWithBothTimeoutsTestImpl(false, 500, 200, -500, -300, -100, 1);
+
+ // negative to positive striking 0 with remote deadline
+ doTickWithBothTimeoutsTestImpl(true, 500, 200, -200, 1, 201, 300);
+ doTickWithBothTimeoutsTestImpl(false, 500, 200, -200, 1, 201, 300);
+ }
+
+ private void doTickWithBothTimeoutsTestImpl(boolean allowLocalTimeout, int localTimeout, int remoteTimeoutHalf, long tick1,
+ long expectedDeadline1, long expectedDeadline2, long expectedDeadline3)
+ {
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ // Set our local idleTimeout
+ transport.setIdleTimeout(localTimeout);
+
+ connection.open();
+ pumpMockTransport(transport);
+
+ assertEquals("should have written data", 1, transport.writes.size());
+ assertNotNull("should have written a non-empty frame", transport.writes.get(0));
+ assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
+
+ // Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
+ // if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
+ Open open = new Open();
+ open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
+ TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+ transport.handleFrame(openFrame);
+ pumpMockTransport(transport);
+
+ long deadline = transport.tick(tick1);
+ assertEquals("Unexpected deadline returned", expectedDeadline1, deadline);
+
+ // Wait for less time than the deadline with no data - get the same value
+ long interimTick = tick1 + 10;
+ assertTrue (interimTick < expectedDeadline1);
+ assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", expectedDeadline1, transport.tick(interimTick));
+ assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
+
+ deadline = transport.tick(expectedDeadline1);
+ assertEquals("When the deadline has been reached expected a new remote deadline to be returned", expectedDeadline2, deadline);
+ assertEquals("tick() should have written data", 2, transport.writes.size());
+ assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
+
+ pumpMockTransport(transport);
+
+ deadline = transport.tick(expectedDeadline2);
+ assertEquals("When the deadline has been reached expected a new local deadline to be returned", expectedDeadline3, deadline);
+ assertEquals("tick() should have written data", 3, transport.writes.size());
+ assertEquals("tick() should have written an empty frame", null, transport.writes.get(2));
+
+ pumpMockTransport(transport);
+
+ if(allowLocalTimeout) {
+ assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+ transport.tick(expectedDeadline3); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
+ assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
+ assertEquals("tick() should have written data", 4, transport.writes.size());
+ assertNotNull("should have written a non-empty frame", transport.writes.get(3));
+ assertTrue("should have written a close frame", transport.writes.get(3) instanceof Close);
+ } else {
+ // Receive Empty frame to satisfy local deadline
+ processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+ deadline = transport.tick(expectedDeadline3);
+ assertEquals("Receiving data should have reset the deadline (to the next remote one)", expectedDeadline2 + (remoteTimeoutHalf), deadline);
+ assertEquals("tick() shouldn't have written data", 3, transport.writes.size());
+ assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+ }
+ }
+
+ @Test
+ public void testTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemote()
+ {
+ doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(false);
+ }
+
+ @Test
+ public void testTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteWithLocalTimeout()
+ {
+ doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(true);
+ }
+
+ private void doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(boolean allowLocalTimeout) {
+ int localTimeout = 5000;
+ int remoteTimeoutHalf = 2000;
+ assertTrue(remoteTimeoutHalf < localTimeout);
+
+ long offset = 2500;
+ assertTrue(offset < localTimeout);
+ assertTrue(offset > remoteTimeoutHalf);
+
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ // Set our local idleTimeout
+ transport.setIdleTimeout(localTimeout);
+
+ connection.open();
+ pumpMockTransport(transport);
+
+ assertEquals("should have written data", 1, transport.writes.size());
+ assertNotNull("should have written a non-empty frame", transport.writes.get(0));
+ assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
+
+ // Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
+ // if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
+ Open open = new Open();
+ open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
+ TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+ transport.handleFrame(openFrame);
+ pumpMockTransport(transport);
+
+ long deadline = transport.tick(Long.MAX_VALUE - offset);
+ assertEquals("Unexpected deadline returned", Long.MAX_VALUE - offset + remoteTimeoutHalf, deadline);
+
+ deadline = transport.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
+ assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", Long.MAX_VALUE -offset + remoteTimeoutHalf, deadline);
+ assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
+
+ deadline = transport.tick(Long.MAX_VALUE -offset + remoteTimeoutHalf); // Wait for the deadline - next deadline should be previous + remoteTimeoutHalf;
+ assertEquals("When the deadline has been reached expected a new remote deadline to be returned", Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1, deadline);
+ assertEquals("tick() should have written data", 2, transport.writes.size());
+ assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
+
+ pumpMockTransport(transport);
+
+ deadline = transport.tick(Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1); // Wait for the deadline - next deadline should be orig + localTimeout;
+ assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
+ assertEquals("tick() should have written data", 3, transport.writes.size());
+ assertEquals("tick() should have written an empty frame", null, transport.writes.get(2));
+
+ pumpMockTransport(transport);
+
+ if(allowLocalTimeout) {
+ assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+ transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
+ assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
+ assertEquals("tick() should have written data", 4, transport.writes.size());
+ assertNotNull("should have written a non-empty frame", transport.writes.get(3));
+ assertTrue("should have written a close frame", transport.writes.get(3) instanceof Close);
+ } else {
+ // Receive Empty frame to satisfy local deadline
+ processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+ deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + 3*remoteTimeoutHalf;
+ assertEquals("Receiving data should have reset the deadline (to the remote one)", Long.MIN_VALUE + (3* remoteTimeoutHalf) - offset -1, deadline);
+ assertEquals("tick() shouldn't have written data", 3, transport.writes.size());
+ assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+ }
+ }
+
+ @Test
+ public void testTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocal()
+ {
+ doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(false);
+ }
+
+ @Test
+ public void testTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalWithLocalTimeout()
+ {
+ doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(true);
+ }
+
+ private void doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(boolean allowLocalTimeout) {
+ int localTimeout = 2000;
+ int remoteTimeoutHalf = 5000;
+ assertTrue(localTimeout < remoteTimeoutHalf);
+
+ long offset = 2500;
+ assertTrue(offset > localTimeout);
+ assertTrue(offset < remoteTimeoutHalf);
+
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ // Set our local idleTimeout
+ transport.setIdleTimeout(localTimeout);
+
+ connection.open();
+ pumpMockTransport(transport);
+
+ assertEquals("should have written data", 1, transport.writes.size());
+ assertNotNull("should have written a non-empty frame", transport.writes.get(0));
+ assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
+
+ // Receive Protocol header
+ processInput(transport, ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00}));
+
+ // Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
+ // if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
+ Open open = new Open();
+ open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
+ TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+ transport.handleFrame(openFrame);
+ pumpMockTransport(transport);
+
+ long deadline = transport.tick(Long.MAX_VALUE - offset);
+ assertEquals("Unexpected deadline returned", Long.MAX_VALUE - offset + localTimeout, deadline);
+
+ deadline = transport.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
+ assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", Long.MAX_VALUE - offset + localTimeout, deadline);
+ assertEquals("tick() shouldn't have written data", 1, transport.writes.size());
+
+ // Receive Empty frame to satisfy local deadline
+ processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+ deadline = transport.tick(Long.MAX_VALUE - offset + localTimeout); // Wait for the deadline - next deadline should be orig + 2* localTimeout;
+ assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout, deadline);
+ assertEquals("tick() should not have written data", 1, transport.writes.size());
+
+ pumpMockTransport(transport);
+
+ if(allowLocalTimeout) {
+ assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+ transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
+ assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
+ assertEquals("tick() should have written data", 2, transport.writes.size());
+ assertNotNull("should have written a non-empty frame", transport.writes.get(1));
+ assertTrue("should have written a close frame", transport.writes.get(1) instanceof Close);
+ } else {
+ // Receive Empty frame to satisfy local deadline
+ processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+ deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline - next deadline should be orig + remoteTimeoutHalf;
+ assertEquals("Receiving data should have reset the deadline (to the remote one)", Long.MIN_VALUE + remoteTimeoutHalf - offset -1, deadline);
+ assertEquals("tick() shouldn't have written data", 1, transport.writes.size());
+
+ deadline = transport.tick(Long.MIN_VALUE + remoteTimeoutHalf - offset -1); // Wait for the deadline - next deadline should be orig + 3* localTimeout;
+ assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (3* localTimeout) - offset -1, deadline);
+ assertEquals("tick() should have written data", 2, transport.writes.size());
+ assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
+ assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+ }
+ }
+
+ @Test
+ public void testTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirst()
+ {
+ doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(false);
+ }
+
+ @Test
+ public void testTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstWithLocalTimeout()
+ {
+ doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(true);
+ }
+
+ private void doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(boolean allowLocalTimeout) {
+ int localTimeout = 2000;
+ int remoteTimeoutHalf = 2500;
+ assertTrue(localTimeout < remoteTimeoutHalf);
+
+ long offset = 500;
+ assertTrue(offset < localTimeout);
+
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ // Set our local idleTimeout
+ transport.setIdleTimeout(localTimeout);
+
+ connection.open();
+ pumpMockTransport(transport);
+
+ assertEquals("should have written data", 1, transport.writes.size());
+ assertNotNull("should have written a non-empty frame", transport.writes.get(0));
+ assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
+
+ // Receive Protocol header
+ processInput(transport, ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00}));
+
+ // Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
+ // if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
+ Open open = new Open();
+ open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
+ TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+ transport.handleFrame(openFrame);
+ pumpMockTransport(transport);
+
+ long deadline = transport.tick(Long.MAX_VALUE - offset);
+ assertEquals("Unexpected deadline returned", Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
+
+ deadline = transport.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
+ assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
+ assertEquals("tick() shouldn't have written data", 1, transport.writes.size());
+
+ // Receive Empty frame to satisfy local deadline
+ processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+ deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + remoteTimeoutHalf;
+ assertEquals("When the deadline has been reached expected a new remote deadline to be returned", Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline);
+ assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
+
+ deadline = transport.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1); // Wait for the deadline - next deadline should be orig + 2* localTimeout;
+ assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout, deadline);
+ assertEquals("tick() should have written data", 2, transport.writes.size());
+ assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
+
+ pumpMockTransport(transport);
+
+ if(allowLocalTimeout) {
+ assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+ transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
+ assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
+ assertEquals("tick() should have written data", 3, transport.writes.size());
+ assertNotNull("should have written a non-empty frame", transport.writes.get(2));
+ assertTrue("should have written a close frame", transport.writes.get(2) instanceof Close);
+ } else {
+ // Receive Empty frame to satisfy local deadline
+ processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+ deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline - next deadline should be orig + 2*remoteTimeoutHalf;
+ assertEquals("Receiving data should have reset the deadline (to the remote one)", Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1, deadline);
+ assertEquals("tick() shouldn't have written data", 2, transport.writes.size());
+ assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+ }
+ }
+
+ @Test
+ public void testTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirst()
+ {
+ doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(false);
+ }
+
+ @Test
+ public void testTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstWithLocalTimeout()
+ {
+ doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(true);
+ }
+
+ private void doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(boolean allowLocalTimeout) {
+ int localTimeout = 5000;
+ int remoteTimeoutHalf = 2000;
+ assertTrue(remoteTimeoutHalf < localTimeout);
+
+ long offset = 500;
+ assertTrue(offset < remoteTimeoutHalf);
+
+ MockTransportImpl transport = new MockTransportImpl();
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ // Set our local idleTimeout
+ transport.setIdleTimeout(localTimeout);
+
+ connection.open();
+ while(transport.pending() > 0) {
+ transport.pop(transport.head().remaining());
+ }
+
+ assertEquals("should have written data", 1, transport.writes.size());
+ assertNotNull("should have written a non-empty frame", transport.writes.get(0));
+ assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
+
+ // Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
+ // if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
+ Open open = new Open();
+ open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
+ TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+ transport.handleFrame(openFrame);
+ pumpMockTransport(transport);
+
+
+ long deadline = transport.tick(Long.MAX_VALUE - offset);
+ assertEquals("Unexpected deadline returned", Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline);
+
+ deadline = transport.tick(Long.MAX_VALUE - (offset - 100)); // Wait for less time than the deadline with no data - get the same value
+ assertEquals("When the deadline hasn't been reached tick() should return the previous deadline", Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline);
+ assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
+
+ deadline = transport.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1); // Wait for the deadline - next deadline should be previous + remoteTimeoutHalf;
+ assertEquals("When the deadline has been reached expected a new remote deadline to be returned", Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1 + remoteTimeoutHalf, deadline);
+ assertEquals("tick() should have written data", 2, transport.writes.size());
+ assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
+
+ pumpMockTransport(transport);
+
+ deadline = transport.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1 + remoteTimeoutHalf); // Wait for the deadline - next deadline should be orig + localTimeout;
+ assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
+ assertEquals("tick() should have written data", 3, transport.writes.size());
+ assertEquals("tick() should have written an empty frame", null, transport.writes.get(2));
+
+ pumpMockTransport(transport);
+
+ if(allowLocalTimeout) {
+ assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+ transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
+ assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
+ assertEquals("tick() should have written data", 4, transport.writes.size());
+ assertNotNull("should have written a non-empty frame", transport.writes.get(3));
+ assertTrue("should have written a close frame", transport.writes.get(3) instanceof Close);
+ } else {
+ // Receive Empty frame to satisfy local deadline
+ processInput(transport, ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+ deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + 3*remoteTimeoutHalf;
+ assertEquals("Receiving data should have reset the deadline (to the remote one)", Long.MIN_VALUE + (3* remoteTimeoutHalf) - offset -1, deadline);
+ assertEquals("tick() shouldn't have written data", 3, transport.writes.size());
+ assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+ }
+ }
+
+ private void processInput(MockTransportImpl transport, ByteBuffer data) {
+ while (data.remaining() > 0)
+ {
+ int origLimit = data.limit();
+ int amount = Math.min(transport.tail().remaining(), data.remaining());
+ data.limit(data.position() + amount);
+ transport.tail().put(data);
+ data.limit(origLimit);
+ transport.process();
+ }
+ }
+
}