PROTON-1901: fix handling of multiplexed incoming deliveries on a session, check for various sequencing issues that would cause illegal states and knock on problems
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
index 29d97c4..6e588d4 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
@@ -21,11 +21,12 @@
 
 package org.apache.qpid.proton.engine.impl;
 
+import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.transport.Flow;
 
 class TransportReceiver extends TransportLink<ReceiverImpl>
 {
-
+    private UnsignedInteger _incomingDeliveryId;
 
     TransportReceiver(ReceiverImpl link)
     {
@@ -52,7 +53,14 @@
             setDeliveryCount(getRemoteDeliveryCount());
             getLink().setDrained(getLink().getDrained() + delta);
         }
-
-
     }
+
+    UnsignedInteger getIncomingDeliveryId() {
+        return _incomingDeliveryId;
+    }
+
+    void setIncomingDeliveryId(UnsignedInteger _incomingDeliveryId) {
+        this._incomingDeliveryId = _incomingDeliveryId;
+    }
+
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index bbacd30..567e8eb 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -264,22 +264,23 @@
     public void handleTransfer(Transfer transfer, Binary payload)
     {
         DeliveryImpl delivery;
-        incrementNextIncomingId();
-        if(transfer.getDeliveryId() == null || transfer.getDeliveryId().equals(_incomingDeliveryId))
-        {
-            TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle());
-            ReceiverImpl receiver = transportReceiver.getReceiver();
-            Binary deliveryTag = transfer.getDeliveryTag();
-            delivery = _unsettledIncomingDeliveriesById.get(_incomingDeliveryId);
-            delivery.getTransportDelivery().incrementSessionSize();
+        incrementNextIncomingId(); // The conceptual/non-wire transfer-id, for the session window.
 
+        TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle());
+        UnsignedInteger linkIncomingDeliveryId = transportReceiver.getIncomingDeliveryId();
+        UnsignedInteger deliveryId = transfer.getDeliveryId();
+
+        if(linkIncomingDeliveryId != null && (linkIncomingDeliveryId.equals(deliveryId) || deliveryId == null))
+        {
+            delivery = _unsettledIncomingDeliveriesById.get(linkIncomingDeliveryId);
+            delivery.getTransportDelivery().incrementSessionSize();
         }
         else
         {
-            // TODO - check deliveryId has been incremented by one
-            _incomingDeliveryId = transfer.getDeliveryId();
-            // TODO - check link handle valid and a receiver
-            TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle());
+            verifyNewDeliveryIdSequence(_incomingDeliveryId, linkIncomingDeliveryId, deliveryId);
+
+            _incomingDeliveryId = deliveryId;
+
             ReceiverImpl receiver = transportReceiver.getReceiver();
             Binary deliveryTag = transfer.getDeliveryTag();
             delivery = receiver.delivery(deliveryTag.getArray(), deliveryTag.getArrayOffset(),
@@ -288,11 +289,13 @@
             if(messageFormat != null) {
                 delivery.setMessageFormat(messageFormat.intValue());
             }
-            TransportDelivery transportDelivery = new TransportDelivery(_incomingDeliveryId, delivery, transportReceiver);
+            TransportDelivery transportDelivery = new TransportDelivery(deliveryId, delivery, transportReceiver);
             delivery.setTransportDelivery(transportDelivery);
-            _unsettledIncomingDeliveriesById.put(_incomingDeliveryId, delivery);
+            transportReceiver.setIncomingDeliveryId(deliveryId);
+            _unsettledIncomingDeliveriesById.put(deliveryId, delivery);
             getSession().incrementIncomingDeliveries(1);
         }
+
         if( transfer.getState()!=null )
         {
             delivery.setRemoteDeliveryState(transfer.getState());
@@ -307,12 +310,18 @@
 
         delivery.updateWork();
 
+        if(!transfer.getMore() || transfer.getAborted())
+        {
+            transportReceiver.setIncomingDeliveryId(null);
+        }
+
         if(!(transfer.getMore() || transfer.getAborted()))
         {
             delivery.setComplete();
             delivery.getLink().getTransportLink().decrementLinkCredit();
             delivery.getLink().getTransportLink().incrementDeliveryCount();
         }
+
         if(Boolean.TRUE.equals(transfer.getSettled()))
         {
             delivery.setRemoteSettled(true);
@@ -328,6 +337,22 @@
         getSession().getConnection().put(Event.Type.DELIVERY, delivery);
     }
 
+    private void verifyNewDeliveryIdSequence(UnsignedInteger previousId, UnsignedInteger linkIncomingId, UnsignedInteger newDeliveryId) {
+        if(newDeliveryId == null) {
+            throw new IllegalStateException("No delivery-id specified on first Transfer of new delivery");
+        }
+
+        // Doing a primitive comparison, uses intValue() since its a uint sequence
+        // and we need the primitive values to wrap appropriately during comparison.
+        if(previousId != null && previousId.intValue() + 1 != newDeliveryId.intValue()) {
+            throw new IllegalStateException("Expected delivery-id " + previousId.add(UnsignedInteger.ONE) + ", got " + newDeliveryId);
+        }
+
+        if(linkIncomingId != null) {
+            throw new IllegalStateException("Illegal multiplex of deliveries on same link with delivery-id " + linkIncomingId + " and " + newDeliveryId);
+        }
+    }
+
     public void freeLocalChannel()
     {
         unsetLocalChannel();
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 9e3afd6..d66964b 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -40,12 +40,14 @@
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.transport.Attach;
 import org.apache.qpid.proton.amqp.transport.Begin;
 import org.apache.qpid.proton.amqp.transport.Close;
 import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.amqp.transport.Disposition;
 import org.apache.qpid.proton.amqp.transport.End;
 import org.apache.qpid.proton.amqp.transport.Flow;
 import org.apache.qpid.proton.amqp.transport.FrameBody;
@@ -1712,6 +1714,28 @@
         return delivery;
     }
 
+    private Delivery verifyDeliveryRawPayload(Receiver receiver, String deliveryTag, byte[] payload)
+    {
+        Delivery delivery = receiver.current();
+
+        assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
+
+        assertFalse(delivery.isPartial());
+        assertTrue(delivery.isReadable());
+
+        byte[] received = new byte[delivery.pending()];
+        int len = receiver.recv(received, 0, BUFFER_SIZE);
+
+        assertEquals("unexpected length", len, received.length);
+
+        assertArrayEquals("Received delivery payload not as expected", payload, received);
+
+        boolean receiverAdvanced = receiver.advance();
+        assertTrue("receiver has not advanced", receiverAdvanced);
+
+        return delivery;
+    }
+
     /**
      * Verify that the {@link TransportInternal#addTransportLayer(TransportLayer)} has the desired
      * effect by observing the wrapping effect on related transport input and output methods.
@@ -2645,16 +2669,16 @@
     }
 
     @Test
-    public void testMultiplexMultiFrameDeliveryOnSingleSession() {
-        doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(false);
+    public void testMultiplexMultiFrameDeliveryOnSingleSessionOutgoing() {
+        doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(false);
     }
 
     @Test
-    public void testMultiplexMultiFrameDeliveriesOnSingleSession() {
-        doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(true);
+    public void testMultiplexMultiFrameDeliveriesOnSingleSessionOutgoing() {
+        doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(true);
     }
 
-    private void doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(boolean bothDeliveriesMultiFrame) {
+    private void doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(boolean bothDeliveriesMultiFrame) {
         MockTransportImpl transport = new MockTransportImpl();
         transport.setEmitFlowEventOnSend(false);
 
@@ -2788,4 +2812,575 @@
             assertEquals("Unexpected more flag", false, transfer.getMore());
         }
     }
+
+    @Test
+    public void testMultiplexMultiFrameDeliveriesOnSingleSessionIncoming() {
+        doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(true);
+    }
+
+    @Test
+    public void testMultiplexMultiFrameDeliveryOnSingleSessionIncoming() {
+        doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(false);
+    }
+
+    private void doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(boolean bothDeliveriesMultiFrame) {
+        int contentLength1 = 7000;
+        int maxPayloadChunkSize = 2000;
+        int contentLength2 = 1000;
+        if(bothDeliveriesMultiFrame) {
+            contentLength2 = 4100;
+        }
+
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        String linkName2 = "myReceiver2";
+        Receiver receiver2 = session.receiver(linkName2);
+        receiver2.flow(5);
+        receiver2.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+        final UnsignedInteger r2handle = UnsignedInteger.ONE;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+        assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+        assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+        FrameBody frame = transport.writes.get(2);
+        assertTrue("Unexpected frame type", frame instanceof Attach);
+        assertEquals("Unexpected handle", ((Attach) frame).getHandle(), r1handle);
+        frame = transport.writes.get(3);
+        assertTrue("Unexpected frame type", frame instanceof Attach);
+        assertEquals("Unexpected handle", ((Attach) frame).getHandle(), r2handle);
+        frame = transport.writes.get(4);
+        assertTrue("Unexpected frame type", frame instanceof Flow);
+        assertEquals("Unexpected handle", ((Flow) frame).getHandle(), r1handle);
+        frame = transport.writes.get(5);
+        assertTrue("Unexpected frame type", frame instanceof Flow);
+        assertEquals("Unexpected handle", ((Flow) frame).getHandle(), r2handle);
+
+        assertNull("Should not yet have a delivery", receiver1.current());
+        assertNull("Should not yet have a delivery", receiver2.current());
+
+        // Send the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        Attach attach2 = new Attach();
+        attach2.setHandle(r2handle);
+        attach2.setRole(Role.SENDER);
+        attach2.setName(linkName2);
+        attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach2, null));
+
+        String deliveryTag1 = "tag1";
+        String messageContent1 = createLargeContent(contentLength1);
+        String deliveryTag2 = "tag2";
+        String messageContent2 = createLargeContent(contentLength2);
+
+        ArrayList<byte[]> message1chunks = createTransferPayloads(messageContent1, maxPayloadChunkSize);
+        assertEquals("unexpected number of payload chunks", 4, message1chunks.size());
+        ArrayList<byte[]> message2chunks = createTransferPayloads(messageContent2, maxPayloadChunkSize);
+        if(bothDeliveriesMultiFrame) {
+            assertEquals("unexpected number of payload chunks", 3, message2chunks.size());
+        } else {
+            assertEquals("unexpected number of payload chunks", 1, message2chunks.size());
+        }
+
+        while (true) {
+           if (!message1chunks.isEmpty()) {
+              byte[] chunk = message1chunks.remove(0);
+              handlePartialTransfer(transport, r1handle, 1, deliveryTag1, chunk, !message1chunks.isEmpty());
+           }
+
+           if (!message2chunks.isEmpty()) {
+               byte[] chunk = message2chunks.remove(0);
+               handlePartialTransfer(transport, r2handle, 2, deliveryTag2, chunk, !message2chunks.isEmpty());
+            }
+
+           if (message1chunks.isEmpty() && message2chunks.isEmpty()) {
+              break;
+           }
+        }
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+        Delivery delivery1 = verifyDelivery(receiver1, deliveryTag1, messageContent1);
+        assertNotNull("Should now have a delivery", delivery1);
+        assertEquals("Unexpected queued count", 0, receiver1.getQueued());
+
+        assertEquals("Unexpected queued count", 1, receiver2.getQueued());
+        Delivery delivery2 = verifyDelivery(receiver2, deliveryTag2, messageContent2);
+        assertNotNull("Should now have a delivery", delivery2);
+        assertEquals("Unexpected queued count", 0, receiver2.getQueued());
+
+        delivery1.disposition(Accepted.getInstance());
+        delivery1.settle();
+        pumpMockTransport(transport);
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 7, transport.writes.size());
+
+        frame = transport.writes.get(6);
+        assertTrue("Unexpected frame type", frame instanceof Disposition);
+        assertEquals("Unexpected delivery id", ((Disposition) frame).getFirst(), UnsignedInteger.ONE);
+        assertEquals("Unexpected delivery id", ((Disposition) frame).getLast(), UnsignedInteger.ONE);
+
+        delivery2.disposition(Accepted.getInstance());
+        delivery2.settle();
+        pumpMockTransport(transport);
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 8, transport.writes.size());
+
+        frame = transport.writes.get(7);
+        assertTrue("Unexpected frame type", frame instanceof Disposition);
+        assertEquals("Unexpected delivery id", ((Disposition) frame).getFirst(), UnsignedInteger.valueOf(2));
+        assertEquals("Unexpected delivery id", ((Disposition) frame).getLast(), UnsignedInteger.valueOf(2));
+    }
+
+    private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, int deliveryId, String deliveryTag, byte[] partialPayload, boolean more)
+    {
+        handlePartialTransfer(transport, handle, UnsignedInteger.valueOf(deliveryId), deliveryTag, partialPayload, more);
+    }
+
+    private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more)
+    {
+        handlePartialTransfer(transport, handle, deliveryId, deliveryTag, partialPayload, more, false);
+    }
+
+    private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted)
+    {
+        byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
+
+        Transfer transfer = new Transfer();
+        transfer.setHandle(handle);
+        transfer.setDeliveryTag(new Binary(tag));
+        transfer.setMessageFormat(UnsignedInteger.valueOf(DeliveryImpl.DEFAULT_MESSAGE_FORMAT));
+        transfer.setMore(more);
+        transfer.setAborted(aborted);
+        if(deliveryId != null) {
+            // Can be omitted in continuation frames for a given delivery.
+            transfer.setDeliveryId(deliveryId);
+        }
+
+        transport.handleFrame(new TransportFrame(0, transfer, new Binary(partialPayload, 0, partialPayload.length)));
+    }
+
+    private ArrayList<byte[]> createTransferPayloads(String content, int payloadChunkSize)
+    {
+        ArrayList<byte[]> payloadChunks = new ArrayList<>();
+
+        Message m = Message.Factory.create();
+        m.setBody(new AmqpValue(content));
+
+        byte[] encoded = new byte[BUFFER_SIZE];
+        int len = m.encode(encoded, 0, BUFFER_SIZE);
+        assertTrue("given array was too small", len < BUFFER_SIZE);
+
+        int copied = 0;
+        while(copied < len) {
+            int chunkSize = Math.min(len - copied, payloadChunkSize);
+            byte[] chunk = new byte[chunkSize];
+
+            System.arraycopy(encoded, copied, chunk, 0, chunkSize);
+
+            payloadChunks.add(chunk);
+            copied += chunkSize;
+        }
+
+        assertFalse("no payload chunks to return", payloadChunks.isEmpty());
+
+        return payloadChunks;
+    }
+
+    @Test
+    public void testDeliveryIdOutOfSequenceCausesISE() {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        String linkName2 = "myReceiver2";
+        Receiver receiver2 = session.receiver(linkName2);
+        receiver2.flow(5);
+        receiver2.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+        final UnsignedInteger r2handle = UnsignedInteger.ONE;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        Attach attach2 = new Attach();
+        attach2.setHandle(r2handle);
+        attach2.setRole(Role.SENDER);
+        attach2.setName(linkName2);
+        attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach2, null));
+
+        String deliveryTag1 = "tag1";
+        String deliveryTag2 = "tag2";
+
+        handlePartialTransfer(transport, r2handle, 2, deliveryTag2, new byte[]{ 2 }, false);
+        try {
+            handlePartialTransfer(transport, r1handle, 1, deliveryTag1, new byte[]{ 1 }, false);
+            fail("Expected an ISE");
+        } catch(IllegalStateException ise) {
+            // Expected
+            assertTrue("Unexpected exception:" + ise, ise.getMessage().contains("Expected delivery-id 3, got 1"));
+        }
+    }
+
+    @Test
+    public void testDeliveryIdMissingOnInitialTransferCausesISE() {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        // Receive a delivery without any delivery-id on the [first] transfer frame, expect it to fail.
+        try {
+            handlePartialTransfer(transport, r1handle, null, "tag1", new byte[]{ 1 }, false);
+            fail("Expected an ISE");
+        } catch(IllegalStateException ise) {
+            // Expected
+            assertEquals("Unexpected message", "No delivery-id specified on first Transfer of new delivery", ise.getMessage());
+        }
+    }
+
+    @Test
+    public void testMultiplexDeliveriesOnSameReceiverLinkCausesISE() {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        // Receive first transfer for a multi-frame delivery
+        handlePartialTransfer(transport, r1handle, 1, "tag1", new byte[]{ 1 }, true);
+
+        // Receive first transfer for ANOTHER multi-frame delivery, expect it to fail
+        // as multiplexing deliveries on a single link is forbidden by the spec.
+        try {
+            handlePartialTransfer(transport, r1handle, 2, "tag2", new byte[]{ 2 }, true);
+            fail("Expected an ISE");
+        } catch(IllegalStateException ise) {
+            // Expected
+            assertEquals("Unexpected message", "Illegal multiplex of deliveries on same link with delivery-id 1 and 2", ise.getMessage());
+        }
+    }
+
+    @Test
+    public void testDeliveryIdTrackingHandlesAbortedDelivery() {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        // Receive first transfer for a multi-frame delivery
+        assertEquals("Unexpected queued count", 0, receiver1.getQueued());
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, "tag1", new byte[]{ 1 }, true);
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+        // Receive second transfer for a multi-frame delivery, indicating it is aborted
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, "tag1", new byte[]{ 2 }, true, true);
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+
+        // Receive first transfer for ANOTHER delivery, expect it not to fail, since the earlier delivery aborted
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, "tag2", new byte[]{ 3 }, false);
+        assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+
+        receiver1.advance();
+        verifyDeliveryRawPayload(receiver1, "tag2", new byte[] { 3 });
+    }
+
+    @Test
+    public void testDeliveryWithIdOmittedOnContinuationTransfers() {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        String linkName2 = "myReceiver2";
+        Receiver receiver2 = session.receiver(linkName2);
+        receiver2.flow(5);
+        receiver2.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+        final UnsignedInteger r2handle = UnsignedInteger.ONE;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        Attach attach2 = new Attach();
+        attach2.setHandle(r2handle);
+        attach2.setRole(Role.SENDER);
+        attach2.setName(linkName2);
+        attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach2, null));
+
+        String deliveryTag1 = "tag1";
+        String deliveryTag2 = "tag2";
+
+        // Send multi-frame deliveries for each link, multiplexed together, and omit
+        // the delivery-id on the continuation frames as allowed for by the spec.
+        handlePartialTransfer(transport, r1handle, 1, deliveryTag1, new byte[]{ 1 }, true);
+        handlePartialTransfer(transport, r2handle, 2, deliveryTag2, new byte[]{ 101 }, true);
+        handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 102 }, true);
+        handlePartialTransfer(transport, r1handle, null, deliveryTag1, new byte[]{ 2 }, true);
+        handlePartialTransfer(transport, r1handle, null, deliveryTag1, new byte[]{ 3 }, false);
+        handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 103 }, true);
+        handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 104 }, false);
+
+        // Verify the transfer frames were all matched to compose the expected delivery payload.
+        verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1, 2, 3 });
+        verifyDeliveryRawPayload(receiver2, deliveryTag2, new byte[] { 101, 102, 103, 104 });
+    }
+
+    @Test
+    public void testDeliveryIdThresholdsAndWraps() {
+        // Check start from 0
+        doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.ZERO, UnsignedInteger.ONE, UnsignedInteger.valueOf(2));
+        // Check run up to max-int (interesting boundary for underlying impl)
+        doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(Integer.MAX_VALUE - 2), UnsignedInteger.valueOf(Integer.MAX_VALUE -1), UnsignedInteger.valueOf(Integer.MAX_VALUE));
+        // Check crossing from signed range value into unsigned range value (interesting boundary for underlying impl)
+        long maxIntAsLong = Integer.MAX_VALUE;
+        doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(maxIntAsLong), UnsignedInteger.valueOf(maxIntAsLong + 1L), UnsignedInteger.valueOf(maxIntAsLong + 2L));
+        // Check run up to max-uint
+        doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(0xFFFFFFFFL - 2), UnsignedInteger.valueOf(0xFFFFFFFFL - 1), UnsignedInteger.MAX_VALUE);
+        // Check wrapping from max unsigned value back to min(/0).
+        doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.MAX_VALUE, UnsignedInteger.ZERO, UnsignedInteger.ONE);
+    }
+
+    private void doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger deliveryId1, UnsignedInteger deliveryId2, UnsignedInteger deliveryId3) {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(5);
+        receiver1.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        String deliveryTag1 = "tag1";
+        String deliveryTag2 = "tag2";
+        String deliveryTag3 = "tag3";
+
+        // Send deliveries with the given delivery-id
+        handlePartialTransfer(transport, r1handle, deliveryId1, deliveryTag1, new byte[]{ 1 }, false);
+        handlePartialTransfer(transport, r1handle, deliveryId2, deliveryTag2, new byte[]{ 2 }, false);
+        handlePartialTransfer(transport, r1handle, deliveryId3, deliveryTag3, new byte[]{ 3 }, false);
+
+        // Verify deliveries arrived with expected payload
+        verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1 });
+        verifyDeliveryRawPayload(receiver1, deliveryTag2, new byte[] { 2 });
+        verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 3 });
+    }
 }