PROTON-1902: fix/allow handling of aborted deliveries

Ensure 'aborted' flag overrules the 'more' and 'settled' flags as appropriate, account for delivery properly, and allow determining it was aborted.
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
index 58c62b6..b997309 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
@@ -99,8 +99,30 @@
 
     public void clear();
 
+    /**
+     * Check for whether the delivery is still partial.
+     *
+     * For a receiving Delivery, this means the delivery does not hold
+     * a complete message payload as all the content hasn't been
+     * received yet. Note that an {@link #isAborted() aborted} delivery
+     * will also be considered partial and the full payload won't
+     * be received.
+     *
+     * For a sending Delivery, this means the sender link has not been
+     * {@link Sender#advance() advanced} to complete the delivery yet.
+     *
+     * @return true if the delivery is partial
+     * @see #isAborted()
+     */
     public boolean isPartial();
 
+    /**
+     * Check for whether the delivery was aborted.
+     *
+     * @return true if the delivery was aborted.
+     */
+    boolean isAborted();
+
     public int pending();
 
     public boolean isBuffered();
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 14950a7..251c68a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -70,6 +70,7 @@
     private boolean _complete;
     private boolean _updated;
     private boolean _done;
+    private boolean _aborted;
 
     private CompositeReadableBuffer _dataBuffer;
     private ReadableBuffer _dataView;
@@ -479,6 +480,17 @@
         _complete = true;
     }
 
+    void setAborted()
+    {
+        _aborted = true;
+    }
+
+    @Override
+    public boolean isAborted()
+    {
+        return _aborted;
+    }
+
     @Override
     public boolean isPartial()
     {
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index 567e8eb..e473675 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -302,7 +302,8 @@
         }
         _unsettledIncomingSize++;
 
-        if (payload != null)
+        boolean aborted = transfer.getAborted();
+        if (payload != null && !aborted)
         {
             delivery.append(payload);
             getSession().incrementIncomingBytes(payload.getLength());
@@ -310,19 +311,20 @@
 
         delivery.updateWork();
 
-        if(!transfer.getMore() || transfer.getAborted())
+        if(!transfer.getMore() || aborted)
         {
             transportReceiver.setIncomingDeliveryId(null);
-        }
+            if(aborted) {
+                delivery.setAborted();
+            } else {
+                delivery.setComplete();
+            }
 
-        if(!(transfer.getMore() || transfer.getAborted()))
-        {
-            delivery.setComplete();
             delivery.getLink().getTransportLink().decrementLinkCredit();
             delivery.getLink().getTransportLink().incrementDeliveryCount();
         }
 
-        if(Boolean.TRUE.equals(transfer.getSettled()))
+        if(Boolean.TRUE.equals(transfer.getSettled()) || aborted)
         {
             delivery.setRemoteSettled(true);
         }
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index d66964b..fbc9b8f 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -2975,6 +2975,11 @@
 
     private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted)
     {
+        handlePartialTransfer(transport, handle, deliveryId, deliveryTag, partialPayload, more, aborted, null);
+    }
+
+    private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted, Boolean settled)
+    {
         byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
 
         Transfer transfer = new Transfer();
@@ -2987,6 +2992,9 @@
             // Can be omitted in continuation frames for a given delivery.
             transfer.setDeliveryId(deliveryId);
         }
+        if(settled != null) {
+            transfer.setSettled(settled);
+        }
 
         transport.handleFrame(new TransportFrame(0, transfer, new Binary(partialPayload, 0, partialPayload.length)));
     }
@@ -3383,4 +3391,143 @@
         verifyDeliveryRawPayload(receiver1, deliveryTag2, new byte[] { 2 });
         verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 3 });
     }
+
+    @Test
+    public void testAbortedDelivery() {
+        // Check aborted=true, more=false, settled=true.
+        doAbortedDeliveryTestImpl(false, true);
+        // Check aborted=true, more=false, settled=unset(false)
+        // Aborted overrides settled not being set.
+        doAbortedDeliveryTestImpl(false, null);
+        // Check aborted=true, more=false, settled=false
+        // Aborted overrides settled being explicitly false.
+        doAbortedDeliveryTestImpl(false, false);
+
+        // Check aborted=true, more=true, settled=true
+        // Aborted overrides the more=true.
+        doAbortedDeliveryTestImpl(true, true);
+        // Check aborted=true, more=true, settled=unset(false)
+        // Aborted overrides the more=true, and settled being unset.
+        doAbortedDeliveryTestImpl(true, null);
+        // Check aborted=true, more=true, settled=false
+        // Aborted overrides the more=true, and settled explicitly false.
+        doAbortedDeliveryTestImpl(true, false);
+    }
+
+    private void doAbortedDeliveryTestImpl(boolean setMoreOnAbortedTransfer, Boolean setSettledOnAbortedTransfer) {
+        MockTransportImpl transport = new MockTransportImpl();
+        transport.setEmitFlowEventOnSend(false);
+        Connection connection = Proton.connection();
+        transport.bind(connection);
+
+        connection.open();
+
+        Session session = connection.session();
+        session.open();
+
+        String linkName1 = "myReceiver1";
+        Receiver receiver1 = session.receiver(linkName1);
+        receiver1.flow(3);
+        receiver1.open();
+
+        pumpMockTransport(transport);
+
+        final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+        // Give the necessary responses to open/begin/attach
+        transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+        Begin begin = new Begin();
+        begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+        begin.setNextOutgoingId(UnsignedInteger.ONE);
+        begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+        begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+        transport.handleFrame(new TransportFrame(0, begin, null));
+
+        Attach attach1 = new Attach();
+        attach1.setHandle(r1handle);
+        attach1.setRole(Role.SENDER);
+        attach1.setName(linkName1);
+        attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        transport.handleFrame(new TransportFrame(0, attach1, null));
+
+        String deliveryTag1 = "tag1";
+        String deliveryTag2 = "tag2";
+        String deliveryTag3 = "tag3";
+
+        // Receive first delivery
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, deliveryTag1, new byte[]{ 1 }, true);
+        assertEquals("Unexpected incoming bytes count", 1, session.getIncomingBytes());
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, deliveryTag1, new byte[]{ 2 }, false);
+
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+        assertEquals("Unexpected incoming bytes count", 2, session.getIncomingBytes());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+
+        // Receive first transfer for a multi-frame delivery
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, deliveryTag2, new byte[]{ 3 }, true);
+        assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+        // Receive second transfer for a multi-frame delivery, indicating it is aborted
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, deliveryTag2, new byte[]{ 4 }, setMoreOnAbortedTransfer, true, setSettledOnAbortedTransfer);
+        assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+        // The aborted frame payload, if any, is dropped. Earlier payload could have already been read, was
+        // previously accounted for, and is incomplete, leaving alone for regular cleanup accounting handling.
+        assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+
+        // Receive transfers for ANOTHER delivery, expect it not to fail, since the earlier delivery aborted
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.valueOf(2), deliveryTag3, new byte[]{ 5 }, true);
+        handlePartialTransfer(transport, r1handle, UnsignedInteger.valueOf(2), deliveryTag3, new byte[]{ 6 }, false);
+        assertEquals("Unexpected queued count", 3, receiver1.getQueued());
+        assertEquals("Unexpected credit", 3, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 5, session.getIncomingBytes());
+
+        // Check the first delivery
+        verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1, 2 });
+        assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+        assertEquals("Unexpected credit", 2, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+
+        // Check the aborted delivery
+        Delivery delivery = receiver1.current();
+        assertTrue(Arrays.equals(deliveryTag2.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
+
+        assertTrue(delivery.isAborted());
+        assertTrue(delivery.remotelySettled()); // Since aborted implicitly means it is settled.
+        assertTrue(delivery.isPartial());
+        assertTrue(delivery.isReadable());
+
+        byte[] received = new byte[delivery.pending()];
+        int len = receiver1.recv(received, 0, BUFFER_SIZE);
+        assertEquals("unexpected length", len, received.length);
+
+        assertArrayEquals("Received delivery payload not as expected", new byte[] { 3 }, received);
+
+        assertTrue("receiver did not advance", receiver1.advance());
+
+        assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+        assertEquals("Unexpected credit", 1, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 2, session.getIncomingBytes());
+
+        // Check the third delivery
+        verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 5, 6 });
+        assertEquals("Unexpected queued count", 0, receiver1.getQueued());
+        assertEquals("Unexpected credit", 0, receiver1.getCredit());
+        assertEquals("Unexpected incoming bytes count", 0, session.getIncomingBytes());
+
+        // Flow new credit and check delivery-count + credit on wire are as expected.
+        receiver1.flow(123);
+        pumpMockTransport(transport);
+
+        assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+        assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Flow);
+        Flow sentFlow = (Flow) transport.writes.get(4);
+
+        assertEquals("Unexpected delivery count", UnsignedInteger.valueOf(3), sentFlow.getDeliveryCount());
+        assertEquals("Unexpected credit", UnsignedInteger.valueOf(123), sentFlow.getLinkCredit());
+    }
 }