PROTON-1583: handle edge cases when driving transport#tick(now) with nanoTime derived values
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 5441dec..33e4f87 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -1554,15 +1554,14 @@
     @Override
     public long tick(long now)
     {
-        long timeout = 0;
+        long deadline = 0;
 
         if (_localIdleTimeout > 0) {
             if (_localIdleDeadline == 0 || _lastBytesInput != _bytesInput) {
-                _localIdleDeadline = now + _localIdleTimeout;
+                _localIdleDeadline = computeDeadline(now, _localIdleTimeout);
                 _lastBytesInput = _bytesInput;
-            } else if (_localIdleDeadline <= now) {
-                _localIdleDeadline = now + _localIdleTimeout;
-
+            } else if (_localIdleDeadline - now <= 0) {
+                _localIdleDeadline = computeDeadline(now, _localIdleTimeout);
                 if (_connectionEndpoint != null &&
                     _connectionEndpoint.getLocalState() != EndpointState.CLOSED) {
                     ErrorCondition condition =
@@ -1588,24 +1587,41 @@
                     close_tail();
                 }
             }
-            timeout = _localIdleDeadline;
+            deadline = _localIdleDeadline;
         }
 
         if (_remoteIdleTimeout != 0 && !_isCloseSent) {
             if (_remoteIdleDeadline == 0 || _lastBytesOutput != _bytesOutput) {
-                _remoteIdleDeadline = now + (_remoteIdleTimeout / 2);
+                _remoteIdleDeadline = computeDeadline(now, _remoteIdleTimeout / 2);
                 _lastBytesOutput = _bytesOutput;
-            } else if (_remoteIdleDeadline <= now) {
-                _remoteIdleDeadline = now + (_remoteIdleTimeout / 2);
+            } else if (_remoteIdleDeadline - now <= 0) {
+                _remoteIdleDeadline = computeDeadline(now, _remoteIdleTimeout / 2);
                 if (pending() == 0) {
                     writeFrame(0, null, null, null);
                     _lastBytesOutput += pending();
                 }
             }
-            timeout = Math.min(timeout == 0 ? _remoteIdleDeadline : timeout, _remoteIdleDeadline);
+
+            if(deadline == 0) {
+                deadline = _remoteIdleDeadline;
+            } else {
+                if(_remoteIdleDeadline - _localIdleDeadline <= 0) {
+                    deadline = _remoteIdleDeadline;
+                } else {
+                    deadline = _localIdleDeadline;
+                }
+            }
         }
 
-        return timeout;
+        return deadline;
+    }
+
+    private long computeDeadline(long now, long timeout) {
+        long deadline = now + timeout;
+
+        // We use 0 to signal not-initialised and/or no-timeout, so in the
+        // unlikely event thats to be the actual deadline, return 1 instead
+        return deadline != 0 ? deadline : 1;
     }
 
     @Override
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index f6ca12c..80ad20c 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -362,15 +362,7 @@
 
         // Protocol header + empty frame
         ByteBuffer data = ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00});
-        while (data.remaining() > 0)
-        {
-            int origLimit = data.limit();
-            int amount = Math.min(transport.tail().remaining(), data.remaining());
-            data.limit(data.position() + amount);
-            transport.tail().put(data);
-            data.limit(origLimit);
-            transport.process();
-        }
+        processInput(transport, data);
         framesWrittenBeforeTick = transport.writes.size();
         deadline = transport.tick(2000);
         assertEquals("Reading data data resets the deadline",  6000, deadline);
@@ -1667,4 +1659,488 @@
 
         assertEquals("Unexpected value : " + getFrameTypesWritten(transport), 4567, transport.getRemoteMaxFrameSize());
     }
+
+    @Test
+    public void testTickWithZeroIdleTimeoutsGivesZeroDeadline()
+    {
+        doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(true);
+    }
+
+    @Test
+    public void testTickWithNullIdleTimeoutsGivesZeroDeadline()
+    {
+        doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(false);
+    }
+
+    private void doTickWithNoIdleTimeoutGivesZeroDeadlineTestImpl(boolean useZero) {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+        while(transport.pending() > 0) {
+            transport.pop(transport.head().remaining());
+        }
+
+        assertEquals("should have written data", 1, transport.writes.size());
+        FrameBody sentOpenFrame = transport.writes.get(0);
+        assertNotNull("should have written a non-empty frame", sentOpenFrame);
+        assertTrue("should have written an open frame", sentOpenFrame instanceof Open);
+        assertNull("should not have had an idletimeout value", ((Open)sentOpenFrame).getIdleTimeOut());
+
+        // Handle the peer transmitting their open with null/zero timeout.
+        Open open = new Open();
+        if(useZero) {
+            open.setIdleTimeOut(UnsignedInteger.ZERO);
+        } else {
+            open.setIdleTimeOut(null);
+        }
+        TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+        transport.handleFrame(openFrame);
+        pumpMockTransport(transport);
+
+        long deadline = transport.tick(0);
+        assertEquals("Unexpected deadline returned", 0, deadline);
+
+        deadline = transport.tick(10);
+        assertEquals("Unexpected deadline returned", 0, deadline);
+    }
+
+    @Test
+    public void testTickWithBothTimeouts()
+    {
+        // all-positive
+        doTickWithBothTimeoutsTestImpl(true, 5000, 2000, 10000, 12000, 14000, 15000);
+        doTickWithBothTimeoutsTestImpl(false, 5000, 2000, 10000, 12000, 14000, 15000);
+
+        // all-negative
+        doTickWithBothTimeoutsTestImpl(true, 10000, 4000, -100000, -96000, -92000, -90000);
+        doTickWithBothTimeoutsTestImpl(false, 10000, 4000, -100000, -96000, -92000, -90000);
+
+        // negative to positive missing 0
+        doTickWithBothTimeoutsTestImpl(true, 500, 200, -450, -250, -50, 50);
+        doTickWithBothTimeoutsTestImpl(false, 500, 200, -450, -250, -50, 50);
+
+        // negative to positive striking 0 with local deadline
+        doTickWithBothTimeoutsTestImpl(true, 500, 200, -500, -300, -100, 1);
+        doTickWithBothTimeoutsTestImpl(false, 500, 200, -500, -300, -100, 1);
+
+        // negative to positive striking 0 with remote deadline
+        doTickWithBothTimeoutsTestImpl(true, 500, 200, -200, 1, 201, 300);
+        doTickWithBothTimeoutsTestImpl(false, 500, 200, -200, 1, 201, 300);
+    }
+
+    private void doTickWithBothTimeoutsTestImpl(boolean allowLocalTimeout, int localTimeout, int remoteTimeoutHalf, long tick1,
+                                                long expectedDeadline1, long expectedDeadline2, long expectedDeadline3)
+    {
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        // Set our local idleTimeout
+        transport.setIdleTimeout(localTimeout);
+
+        connection.open();
+        pumpMockTransport(transport);
+
+        assertEquals("should have written data", 1, transport.writes.size());
+        assertNotNull("should have written a non-empty frame", transport.writes.get(0));
+        assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
+
+        // Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
+        // if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
+        Open open = new Open();
+        open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
+        TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+        transport.handleFrame(openFrame);
+        pumpMockTransport(transport);
+
+        long deadline = transport.tick(tick1);
+        assertEquals("Unexpected deadline returned", expectedDeadline1, deadline);
+
+        // Wait for less time than the deadline with no data - get the same value
+        long interimTick = tick1 + 10;
+        assertTrue (interimTick < expectedDeadline1);
+        assertEquals("When the deadline hasn't been reached tick() should return the previous deadline",  expectedDeadline1, transport.tick(interimTick));
+        assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
+
+        deadline = transport.tick(expectedDeadline1);
+        assertEquals("When the deadline has been reached expected a new remote deadline to be returned", expectedDeadline2, deadline);
+        assertEquals("tick() should have written data", 2, transport.writes.size());
+        assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
+
+        pumpMockTransport(transport);
+
+        deadline = transport.tick(expectedDeadline2);
+        assertEquals("When the deadline has been reached expected a new local deadline to be returned", expectedDeadline3, deadline);
+        assertEquals("tick() should have written data", 3, transport.writes.size());
+        assertEquals("tick() should have written an empty frame", null, transport.writes.get(2));
+
+        pumpMockTransport(transport);
+
+        if(allowLocalTimeout) {
+            assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+            transport.tick(expectedDeadline3); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
+            assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
+            assertEquals("tick() should have written data", 4, transport.writes.size());
+            assertNotNull("should have written a non-empty frame", transport.writes.get(3));
+            assertTrue("should have written a close frame", transport.writes.get(3) instanceof Close);
+        } else {
+            // Receive Empty frame to satisfy local deadline
+            processInput(transport,  ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+            deadline = transport.tick(expectedDeadline3);
+            assertEquals("Receiving data should have reset the deadline (to the next remote one)",  expectedDeadline2 + (remoteTimeoutHalf), deadline);
+            assertEquals("tick() shouldn't have written data", 3, transport.writes.size());
+            assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+        }
+    }
+
+    @Test
+    public void testTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemote()
+    {
+        doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(false);
+    }
+
+    @Test
+    public void testTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteWithLocalTimeout()
+    {
+        doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(true);
+    }
+
+    private void doTickWithNanoTimeDerivedValueWhichWrapsLocalThenRemoteTestImpl(boolean allowLocalTimeout) {
+        int localTimeout = 5000;
+        int remoteTimeoutHalf = 2000;
+        assertTrue(remoteTimeoutHalf < localTimeout);
+
+        long offset = 2500;
+        assertTrue(offset < localTimeout);
+        assertTrue(offset > remoteTimeoutHalf);
+
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        // Set our local idleTimeout
+        transport.setIdleTimeout(localTimeout);
+
+        connection.open();
+        pumpMockTransport(transport);
+
+        assertEquals("should have written data", 1, transport.writes.size());
+        assertNotNull("should have written a non-empty frame", transport.writes.get(0));
+        assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
+
+        // Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
+        // if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
+        Open open = new Open();
+        open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
+        TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+        transport.handleFrame(openFrame);
+        pumpMockTransport(transport);
+
+        long deadline = transport.tick(Long.MAX_VALUE - offset);
+        assertEquals("Unexpected deadline returned", Long.MAX_VALUE - offset + remoteTimeoutHalf, deadline);
+
+        deadline = transport.tick(Long.MAX_VALUE - (offset - 100));    // Wait for less time than the deadline with no data - get the same value
+        assertEquals("When the deadline hasn't been reached tick() should return the previous deadline",  Long.MAX_VALUE -offset + remoteTimeoutHalf, deadline);
+        assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
+
+        deadline = transport.tick(Long.MAX_VALUE -offset + remoteTimeoutHalf); // Wait for the deadline - next deadline should be previous + remoteTimeoutHalf;
+        assertEquals("When the deadline has been reached expected a new remote deadline to be returned", Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1, deadline);
+        assertEquals("tick() should have written data", 2, transport.writes.size());
+        assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
+
+        pumpMockTransport(transport);
+
+        deadline = transport.tick(Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1); // Wait for the deadline - next deadline should be orig + localTimeout;
+        assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
+        assertEquals("tick() should have written data", 3, transport.writes.size());
+        assertEquals("tick() should have written an empty frame", null, transport.writes.get(2));
+
+        pumpMockTransport(transport);
+
+        if(allowLocalTimeout) {
+            assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+            transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
+            assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
+            assertEquals("tick() should have written data", 4, transport.writes.size());
+            assertNotNull("should have written a non-empty frame", transport.writes.get(3));
+            assertTrue("should have written a close frame", transport.writes.get(3) instanceof Close);
+        } else {
+            // Receive Empty frame to satisfy local deadline
+            processInput(transport,  ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+            deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + 3*remoteTimeoutHalf;
+            assertEquals("Receiving data should have reset the deadline (to the remote one)",  Long.MIN_VALUE + (3* remoteTimeoutHalf) - offset -1, deadline);
+            assertEquals("tick() shouldn't have written data", 3, transport.writes.size());
+            assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+        }
+    }
+
+    @Test
+    public void testTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocal()
+    {
+        doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(false);
+    }
+
+    @Test
+    public void testTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalWithLocalTimeout()
+    {
+        doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(true);
+    }
+
+    private void doTickWithNanoTimeDerivedValueWhichWrapsRemoteThenLocalTestImpl(boolean allowLocalTimeout) {
+        int localTimeout = 2000;
+        int remoteTimeoutHalf = 5000;
+        assertTrue(localTimeout < remoteTimeoutHalf);
+
+        long offset = 2500;
+        assertTrue(offset > localTimeout);
+        assertTrue(offset < remoteTimeoutHalf);
+
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        // Set our local idleTimeout
+        transport.setIdleTimeout(localTimeout);
+
+        connection.open();
+        pumpMockTransport(transport);
+
+        assertEquals("should have written data", 1, transport.writes.size());
+        assertNotNull("should have written a non-empty frame", transport.writes.get(0));
+        assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
+
+        // Receive Protocol header
+        processInput(transport, ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00}));
+
+        // Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
+        // if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
+        Open open = new Open();
+        open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
+        TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+        transport.handleFrame(openFrame);
+        pumpMockTransport(transport);
+
+        long deadline = transport.tick(Long.MAX_VALUE - offset);
+        assertEquals("Unexpected deadline returned",  Long.MAX_VALUE - offset + localTimeout, deadline);
+
+        deadline = transport.tick(Long.MAX_VALUE - (offset - 100));    // Wait for less time than the deadline with no data - get the same value
+        assertEquals("When the deadline hasn't been reached tick() should return the previous deadline",  Long.MAX_VALUE - offset + localTimeout, deadline);
+        assertEquals("tick() shouldn't have written data", 1, transport.writes.size());
+
+        // Receive Empty frame to satisfy local deadline
+        processInput(transport,  ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+        deadline = transport.tick(Long.MAX_VALUE - offset + localTimeout); // Wait for the deadline - next deadline should be orig + 2* localTimeout;
+        assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout, deadline);
+        assertEquals("tick() should not have written data", 1, transport.writes.size());
+
+        pumpMockTransport(transport);
+
+        if(allowLocalTimeout) {
+            assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+            transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
+            assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
+            assertEquals("tick() should have written data", 2, transport.writes.size());
+            assertNotNull("should have written a non-empty frame", transport.writes.get(1));
+            assertTrue("should have written a close frame", transport.writes.get(1) instanceof Close);
+        } else {
+            // Receive Empty frame to satisfy local deadline
+            processInput(transport,  ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+            deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline - next deadline should be orig + remoteTimeoutHalf;
+            assertEquals("Receiving data should have reset the deadline (to the remote one)",  Long.MIN_VALUE + remoteTimeoutHalf - offset -1, deadline);
+            assertEquals("tick() shouldn't have written data", 1, transport.writes.size());
+
+            deadline = transport.tick(Long.MIN_VALUE + remoteTimeoutHalf - offset -1); // Wait for the deadline - next deadline should be orig + 3* localTimeout;
+            assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (3* localTimeout) - offset -1, deadline);
+            assertEquals("tick() should have written data", 2, transport.writes.size());
+            assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
+            assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+        }
+    }
+
+    @Test
+    public void testTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirst()
+    {
+        doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(false);
+    }
+
+    @Test
+    public void testTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstWithLocalTimeout()
+    {
+        doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(true);
+    }
+
+    private void doTickWithNanoTimeDerivedValueWhichWrapsBothRemoteFirstTestImpl(boolean allowLocalTimeout) {
+        int localTimeout = 2000;
+        int remoteTimeoutHalf = 2500;
+        assertTrue(localTimeout < remoteTimeoutHalf);
+
+        long offset = 500;
+        assertTrue(offset < localTimeout);
+
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        // Set our local idleTimeout
+        transport.setIdleTimeout(localTimeout);
+
+        connection.open();
+        pumpMockTransport(transport);
+
+        assertEquals("should have written data", 1, transport.writes.size());
+        assertNotNull("should have written a non-empty frame", transport.writes.get(0));
+        assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
+
+        // Receive Protocol header
+        processInput(transport, ByteBuffer.wrap(new byte[] {'A', 'M', 'Q', 'P', 0x00, 0x01, 0x00, 0x00}));
+
+        // Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
+        // if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
+        Open open = new Open();
+        open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
+        TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+        transport.handleFrame(openFrame);
+        pumpMockTransport(transport);
+
+        long deadline = transport.tick(Long.MAX_VALUE - offset);
+        assertEquals("Unexpected deadline returned",  Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
+
+        deadline = transport.tick(Long.MAX_VALUE - (offset - 100));    // Wait for less time than the deadline with no data - get the same value
+        assertEquals("When the deadline hasn't been reached tick() should return the previous deadline",  Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
+        assertEquals("tick() shouldn't have written data", 1, transport.writes.size());
+
+        // Receive Empty frame to satisfy local deadline
+        processInput(transport,  ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+        deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + remoteTimeoutHalf;
+        assertEquals("When the deadline has been reached expected a new remote deadline to be returned", Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline);
+        assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
+
+        deadline = transport.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1); // Wait for the deadline - next deadline should be orig + 2* localTimeout;
+        assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout, deadline);
+        assertEquals("tick() should have written data", 2, transport.writes.size());
+        assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
+
+        pumpMockTransport(transport);
+
+        if(allowLocalTimeout) {
+            assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+            transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
+            assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
+            assertEquals("tick() should have written data", 3, transport.writes.size());
+            assertNotNull("should have written a non-empty frame", transport.writes.get(2));
+            assertTrue("should have written a close frame", transport.writes.get(2) instanceof Close);
+        } else {
+            // Receive Empty frame to satisfy local deadline
+            processInput(transport,  ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+            deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1 + localTimeout); // Wait for the deadline - next deadline should be orig + 2*remoteTimeoutHalf;
+            assertEquals("Receiving data should have reset the deadline (to the remote one)",  Long.MIN_VALUE + (2* remoteTimeoutHalf) - offset -1, deadline);
+            assertEquals("tick() shouldn't have written data", 2, transport.writes.size());
+            assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+        }
+    }
+
+    @Test
+    public void testTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirst()
+    {
+        doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(false);
+    }
+
+    @Test
+    public void testTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstWithLocalTimeout()
+    {
+        doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(true);
+    }
+
+    private void doTickWithNanoTimeDerivedValueWhichWrapsBothLocalFirstTestImpl(boolean allowLocalTimeout) {
+        int localTimeout = 5000;
+        int remoteTimeoutHalf = 2000;
+        assertTrue(remoteTimeoutHalf < localTimeout);
+
+        long offset = 500;
+        assertTrue(offset < remoteTimeoutHalf);
+
+        MockTransportImpl transport = new MockTransportImpl();
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        // Set our local idleTimeout
+        transport.setIdleTimeout(localTimeout);
+
+        connection.open();
+        while(transport.pending() > 0) {
+            transport.pop(transport.head().remaining());
+        }
+
+        assertEquals("should have written data", 1, transport.writes.size());
+        assertNotNull("should have written a non-empty frame", transport.writes.get(0));
+        assertTrue("should have written an open frame", transport.writes.get(0) instanceof Open);
+
+        // Handle the peer transmitting [half] their timeout. We half it on receipt to avoid spurious timeouts
+        // if they not have transmitted half their actual timeout, as the AMQP spec only says they SHOULD do that.
+        Open open = new Open();
+        open.setIdleTimeOut(new UnsignedInteger(remoteTimeoutHalf * 2));
+        TransportFrame openFrame = new TransportFrame(CHANNEL_ID, open, null);
+        transport.handleFrame(openFrame);
+        pumpMockTransport(transport);
+
+
+        long deadline = transport.tick(Long.MAX_VALUE - offset);
+        assertEquals("Unexpected deadline returned",  Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline);
+
+        deadline = transport.tick(Long.MAX_VALUE - (offset - 100));    // Wait for less time than the deadline with no data - get the same value
+        assertEquals("When the deadline hasn't been reached tick() should return the previous deadline",  Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1, deadline);
+        assertEquals("When the deadline hasn't been reached tick() shouldn't write data", 1, transport.writes.size());
+
+        deadline = transport.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1); // Wait for the deadline - next deadline should be previous + remoteTimeoutHalf;
+        assertEquals("When the deadline has been reached expected a new remote deadline to be returned", Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1 + remoteTimeoutHalf, deadline);
+        assertEquals("tick() should have written data", 2, transport.writes.size());
+        assertEquals("tick() should have written an empty frame", null, transport.writes.get(1));
+
+        pumpMockTransport(transport);
+
+        deadline = transport.tick(Long.MIN_VALUE + (remoteTimeoutHalf - offset) -1 + remoteTimeoutHalf); // Wait for the deadline - next deadline should be orig + localTimeout;
+        assertEquals("When the deadline has been reached expected a new local deadline to be returned", Long.MIN_VALUE + (localTimeout - offset) -1, deadline);
+        assertEquals("tick() should have written data", 3, transport.writes.size());
+        assertEquals("tick() should have written an empty frame", null, transport.writes.get(2));
+
+        pumpMockTransport(transport);
+
+        if(allowLocalTimeout) {
+            assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+            transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline, but don't receive traffic, allow local timeout to expire
+            assertEquals("Calling tick() after the deadline should result in the connection being closed", EndpointState.CLOSED, connection.getLocalState());
+            assertEquals("tick() should have written data", 4, transport.writes.size());
+            assertNotNull("should have written a non-empty frame", transport.writes.get(3));
+            assertTrue("should have written a close frame", transport.writes.get(3) instanceof Close);
+        } else {
+            // Receive Empty frame to satisfy local deadline
+            processInput(transport,  ByteBuffer.wrap(new byte[] {0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}));
+
+            deadline = transport.tick(Long.MIN_VALUE + (localTimeout - offset) -1); // Wait for the deadline - next deadline should be orig + 3*remoteTimeoutHalf;
+            assertEquals("Receiving data should have reset the deadline (to the remote one)",  Long.MIN_VALUE + (3* remoteTimeoutHalf) - offset -1, deadline);
+            assertEquals("tick() shouldn't have written data", 3, transport.writes.size());
+            assertEquals("Connection should be active", EndpointState.ACTIVE, connection.getLocalState());
+        }
+    }
+
+    private void processInput(MockTransportImpl transport, ByteBuffer data) {
+        while (data.remaining() > 0)
+        {
+            int origLimit = data.limit();
+            int amount = Math.min(transport.tail().remaining(), data.remaining());
+            data.limit(data.position() + amount);
+            transport.tail().put(data);
+            data.limit(origLimit);
+            transport.process();
+        }
+    }
+
 }