PROTON-1902: fix/allow handling of aborted deliveries
Ensure 'aborted' flag overrules the 'more' and 'settled' flags as appropriate, account for delivery properly, and allow determining it was aborted.
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
index 58c62b6..b997309 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Delivery.java
@@ -99,8 +99,30 @@
public void clear();
+ /**
+ * Check for whether the delivery is still partial.
+ *
+ * For a receiving Delivery, this means the delivery does not hold
+ * a complete message payload as all the content hasn't been
+ * received yet. Note that an {@link #isAborted() aborted} delivery
+ * will also be considered partial and the full payload won't
+ * be received.
+ *
+ * For a sending Delivery, this means the sender link has not been
+ * {@link Sender#advance() advanced} to complete the delivery yet.
+ *
+ * @return true if the delivery is partial
+ * @see #isAborted()
+ */
public boolean isPartial();
+ /**
+ * Check for whether the delivery was aborted.
+ *
+ * @return true if the delivery was aborted.
+ */
+ boolean isAborted();
+
public int pending();
public boolean isBuffered();
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
index 14950a7..251c68a 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java
@@ -70,6 +70,7 @@
private boolean _complete;
private boolean _updated;
private boolean _done;
+ private boolean _aborted;
private CompositeReadableBuffer _dataBuffer;
private ReadableBuffer _dataView;
@@ -479,6 +480,17 @@
_complete = true;
}
+ void setAborted()
+ {
+ _aborted = true;
+ }
+
+ @Override
+ public boolean isAborted()
+ {
+ return _aborted;
+ }
+
@Override
public boolean isPartial()
{
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index 567e8eb..e473675 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -302,7 +302,8 @@
}
_unsettledIncomingSize++;
- if (payload != null)
+ boolean aborted = transfer.getAborted();
+ if (payload != null && !aborted)
{
delivery.append(payload);
getSession().incrementIncomingBytes(payload.getLength());
@@ -310,19 +311,20 @@
delivery.updateWork();
- if(!transfer.getMore() || transfer.getAborted())
+ if(!transfer.getMore() || aborted)
{
transportReceiver.setIncomingDeliveryId(null);
- }
+ if(aborted) {
+ delivery.setAborted();
+ } else {
+ delivery.setComplete();
+ }
- if(!(transfer.getMore() || transfer.getAborted()))
- {
- delivery.setComplete();
delivery.getLink().getTransportLink().decrementLinkCredit();
delivery.getLink().getTransportLink().incrementDeliveryCount();
}
- if(Boolean.TRUE.equals(transfer.getSettled()))
+ if(Boolean.TRUE.equals(transfer.getSettled()) || aborted)
{
delivery.setRemoteSettled(true);
}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index d66964b..fbc9b8f 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -2975,6 +2975,11 @@
private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted)
{
+ handlePartialTransfer(transport, handle, deliveryId, deliveryTag, partialPayload, more, aborted, null);
+ }
+
+ private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted, Boolean settled)
+ {
byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
Transfer transfer = new Transfer();
@@ -2987,6 +2992,9 @@
// Can be omitted in continuation frames for a given delivery.
transfer.setDeliveryId(deliveryId);
}
+ if(settled != null) {
+ transfer.setSettled(settled);
+ }
transport.handleFrame(new TransportFrame(0, transfer, new Binary(partialPayload, 0, partialPayload.length)));
}
@@ -3383,4 +3391,143 @@
verifyDeliveryRawPayload(receiver1, deliveryTag2, new byte[] { 2 });
verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 3 });
}
+
+ @Test
+ public void testAbortedDelivery() {
+ // Check aborted=true, more=false, settled=true.
+ doAbortedDeliveryTestImpl(false, true);
+ // Check aborted=true, more=false, settled=unset(false)
+ // Aborted overrides settled not being set.
+ doAbortedDeliveryTestImpl(false, null);
+ // Check aborted=true, more=false, settled=false
+ // Aborted overrides settled being explicitly false.
+ doAbortedDeliveryTestImpl(false, false);
+
+ // Check aborted=true, more=true, settled=true
+ // Aborted overrides the more=true.
+ doAbortedDeliveryTestImpl(true, true);
+ // Check aborted=true, more=true, settled=unset(false)
+ // Aborted overrides the more=true, and settled being unset.
+ doAbortedDeliveryTestImpl(true, null);
+ // Check aborted=true, more=true, settled=false
+ // Aborted overrides the more=true, and settled explicitly false.
+ doAbortedDeliveryTestImpl(true, false);
+ }
+
+ private void doAbortedDeliveryTestImpl(boolean setMoreOnAbortedTransfer, Boolean setSettledOnAbortedTransfer) {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName1 = "myReceiver1";
+ Receiver receiver1 = session.receiver(linkName1);
+ receiver1.flow(3);
+ receiver1.open();
+
+ pumpMockTransport(transport);
+
+ final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+ // Give the necessary responses to open/begin/attach
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach1 = new Attach();
+ attach1.setHandle(r1handle);
+ attach1.setRole(Role.SENDER);
+ attach1.setName(linkName1);
+ attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach1, null));
+
+ String deliveryTag1 = "tag1";
+ String deliveryTag2 = "tag2";
+ String deliveryTag3 = "tag3";
+
+ // Receive first delivery
+ handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, deliveryTag1, new byte[]{ 1 }, true);
+ assertEquals("Unexpected incoming bytes count", 1, session.getIncomingBytes());
+ handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, deliveryTag1, new byte[]{ 2 }, false);
+
+ assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+ assertEquals("Unexpected incoming bytes count", 2, session.getIncomingBytes());
+ assertEquals("Unexpected credit", 3, receiver1.getCredit());
+
+ // Receive first transfer for a multi-frame delivery
+ handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, deliveryTag2, new byte[]{ 3 }, true);
+ assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+ assertEquals("Unexpected credit", 3, receiver1.getCredit());
+ assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+ // Receive second transfer for a multi-frame delivery, indicating it is aborted
+ handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, deliveryTag2, new byte[]{ 4 }, setMoreOnAbortedTransfer, true, setSettledOnAbortedTransfer);
+ assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+ assertEquals("Unexpected credit", 3, receiver1.getCredit());
+ // The aborted frame payload, if any, is dropped. Earlier payload could have already been read, was
+ // previously accounted for, and is incomplete, leaving alone for regular cleanup accounting handling.
+ assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+
+ // Receive transfers for ANOTHER delivery, expect it not to fail, since the earlier delivery aborted
+ handlePartialTransfer(transport, r1handle, UnsignedInteger.valueOf(2), deliveryTag3, new byte[]{ 5 }, true);
+ handlePartialTransfer(transport, r1handle, UnsignedInteger.valueOf(2), deliveryTag3, new byte[]{ 6 }, false);
+ assertEquals("Unexpected queued count", 3, receiver1.getQueued());
+ assertEquals("Unexpected credit", 3, receiver1.getCredit());
+ assertEquals("Unexpected incoming bytes count", 5, session.getIncomingBytes());
+
+ // Check the first delivery
+ verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1, 2 });
+ assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+ assertEquals("Unexpected credit", 2, receiver1.getCredit());
+ assertEquals("Unexpected incoming bytes count", 3, session.getIncomingBytes());
+
+ // Check the aborted delivery
+ Delivery delivery = receiver1.current();
+ assertTrue(Arrays.equals(deliveryTag2.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
+
+ assertTrue(delivery.isAborted());
+ assertTrue(delivery.remotelySettled()); // Since aborted implicitly means it is settled.
+ assertTrue(delivery.isPartial());
+ assertTrue(delivery.isReadable());
+
+ byte[] received = new byte[delivery.pending()];
+ int len = receiver1.recv(received, 0, BUFFER_SIZE);
+ assertEquals("unexpected length", len, received.length);
+
+ assertArrayEquals("Received delivery payload not as expected", new byte[] { 3 }, received);
+
+ assertTrue("receiver did not advance", receiver1.advance());
+
+ assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+ assertEquals("Unexpected credit", 1, receiver1.getCredit());
+ assertEquals("Unexpected incoming bytes count", 2, session.getIncomingBytes());
+
+ // Check the third delivery
+ verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 5, 6 });
+ assertEquals("Unexpected queued count", 0, receiver1.getQueued());
+ assertEquals("Unexpected credit", 0, receiver1.getCredit());
+ assertEquals("Unexpected incoming bytes count", 0, session.getIncomingBytes());
+
+ // Flow new credit and check delivery-count + credit on wire are as expected.
+ receiver1.flow(123);
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 5, transport.writes.size());
+ assertTrue("Unexpected frame type", transport.writes.get(4) instanceof Flow);
+ Flow sentFlow = (Flow) transport.writes.get(4);
+
+ assertEquals("Unexpected delivery count", UnsignedInteger.valueOf(3), sentFlow.getDeliveryCount());
+ assertEquals("Unexpected credit", UnsignedInteger.valueOf(123), sentFlow.getLinkCredit());
+ }
}