PROTON-1901: fix handling of multiplexed incoming deliveries on a session, check for various sequencing issues that would cause illegal states and knock on problems
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
index 29d97c4..6e588d4 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportReceiver.java
@@ -21,11 +21,12 @@
package org.apache.qpid.proton.engine.impl;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.transport.Flow;
class TransportReceiver extends TransportLink<ReceiverImpl>
{
-
+ private UnsignedInteger _incomingDeliveryId;
TransportReceiver(ReceiverImpl link)
{
@@ -52,7 +53,14 @@
setDeliveryCount(getRemoteDeliveryCount());
getLink().setDrained(getLink().getDrained() + delta);
}
-
-
}
+
+ UnsignedInteger getIncomingDeliveryId() {
+ return _incomingDeliveryId;
+ }
+
+ void setIncomingDeliveryId(UnsignedInteger _incomingDeliveryId) {
+ this._incomingDeliveryId = _incomingDeliveryId;
+ }
+
}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
index bbacd30..567e8eb 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java
@@ -264,22 +264,23 @@
public void handleTransfer(Transfer transfer, Binary payload)
{
DeliveryImpl delivery;
- incrementNextIncomingId();
- if(transfer.getDeliveryId() == null || transfer.getDeliveryId().equals(_incomingDeliveryId))
- {
- TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle());
- ReceiverImpl receiver = transportReceiver.getReceiver();
- Binary deliveryTag = transfer.getDeliveryTag();
- delivery = _unsettledIncomingDeliveriesById.get(_incomingDeliveryId);
- delivery.getTransportDelivery().incrementSessionSize();
+ incrementNextIncomingId(); // The conceptual/non-wire transfer-id, for the session window.
+ TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle());
+ UnsignedInteger linkIncomingDeliveryId = transportReceiver.getIncomingDeliveryId();
+ UnsignedInteger deliveryId = transfer.getDeliveryId();
+
+ if(linkIncomingDeliveryId != null && (linkIncomingDeliveryId.equals(deliveryId) || deliveryId == null))
+ {
+ delivery = _unsettledIncomingDeliveriesById.get(linkIncomingDeliveryId);
+ delivery.getTransportDelivery().incrementSessionSize();
}
else
{
- // TODO - check deliveryId has been incremented by one
- _incomingDeliveryId = transfer.getDeliveryId();
- // TODO - check link handle valid and a receiver
- TransportReceiver transportReceiver = (TransportReceiver) getLinkFromRemoteHandle(transfer.getHandle());
+ verifyNewDeliveryIdSequence(_incomingDeliveryId, linkIncomingDeliveryId, deliveryId);
+
+ _incomingDeliveryId = deliveryId;
+
ReceiverImpl receiver = transportReceiver.getReceiver();
Binary deliveryTag = transfer.getDeliveryTag();
delivery = receiver.delivery(deliveryTag.getArray(), deliveryTag.getArrayOffset(),
@@ -288,11 +289,13 @@
if(messageFormat != null) {
delivery.setMessageFormat(messageFormat.intValue());
}
- TransportDelivery transportDelivery = new TransportDelivery(_incomingDeliveryId, delivery, transportReceiver);
+ TransportDelivery transportDelivery = new TransportDelivery(deliveryId, delivery, transportReceiver);
delivery.setTransportDelivery(transportDelivery);
- _unsettledIncomingDeliveriesById.put(_incomingDeliveryId, delivery);
+ transportReceiver.setIncomingDeliveryId(deliveryId);
+ _unsettledIncomingDeliveriesById.put(deliveryId, delivery);
getSession().incrementIncomingDeliveries(1);
}
+
if( transfer.getState()!=null )
{
delivery.setRemoteDeliveryState(transfer.getState());
@@ -307,12 +310,18 @@
delivery.updateWork();
+ if(!transfer.getMore() || transfer.getAborted())
+ {
+ transportReceiver.setIncomingDeliveryId(null);
+ }
+
if(!(transfer.getMore() || transfer.getAborted()))
{
delivery.setComplete();
delivery.getLink().getTransportLink().decrementLinkCredit();
delivery.getLink().getTransportLink().incrementDeliveryCount();
}
+
if(Boolean.TRUE.equals(transfer.getSettled()))
{
delivery.setRemoteSettled(true);
@@ -328,6 +337,22 @@
getSession().getConnection().put(Event.Type.DELIVERY, delivery);
}
+ private void verifyNewDeliveryIdSequence(UnsignedInteger previousId, UnsignedInteger linkIncomingId, UnsignedInteger newDeliveryId) {
+ if(newDeliveryId == null) {
+ throw new IllegalStateException("No delivery-id specified on first Transfer of new delivery");
+ }
+
+ // Doing a primitive comparison, uses intValue() since its a uint sequence
+ // and we need the primitive values to wrap appropriately during comparison.
+ if(previousId != null && previousId.intValue() + 1 != newDeliveryId.intValue()) {
+ throw new IllegalStateException("Expected delivery-id " + previousId.add(UnsignedInteger.ONE) + ", got " + newDeliveryId);
+ }
+
+ if(linkIncomingId != null) {
+ throw new IllegalStateException("Illegal multiplex of deliveries on same link with delivery-id " + linkIncomingId + " and " + newDeliveryId);
+ }
+ }
+
public void freeLocalChannel()
{
unsetLocalChannel();
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 9e3afd6..d66964b 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -40,12 +40,14 @@
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedShort;
+import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transport.Attach;
import org.apache.qpid.proton.amqp.transport.Begin;
import org.apache.qpid.proton.amqp.transport.Close;
import org.apache.qpid.proton.amqp.transport.Detach;
+import org.apache.qpid.proton.amqp.transport.Disposition;
import org.apache.qpid.proton.amqp.transport.End;
import org.apache.qpid.proton.amqp.transport.Flow;
import org.apache.qpid.proton.amqp.transport.FrameBody;
@@ -1712,6 +1714,28 @@
return delivery;
}
+ private Delivery verifyDeliveryRawPayload(Receiver receiver, String deliveryTag, byte[] payload)
+ {
+ Delivery delivery = receiver.current();
+
+ assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
+
+ assertFalse(delivery.isPartial());
+ assertTrue(delivery.isReadable());
+
+ byte[] received = new byte[delivery.pending()];
+ int len = receiver.recv(received, 0, BUFFER_SIZE);
+
+ assertEquals("unexpected length", len, received.length);
+
+ assertArrayEquals("Received delivery payload not as expected", payload, received);
+
+ boolean receiverAdvanced = receiver.advance();
+ assertTrue("receiver has not advanced", receiverAdvanced);
+
+ return delivery;
+ }
+
/**
* Verify that the {@link TransportInternal#addTransportLayer(TransportLayer)} has the desired
* effect by observing the wrapping effect on related transport input and output methods.
@@ -2645,16 +2669,16 @@
}
@Test
- public void testMultiplexMultiFrameDeliveryOnSingleSession() {
- doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(false);
+ public void testMultiplexMultiFrameDeliveryOnSingleSessionOutgoing() {
+ doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(false);
}
@Test
- public void testMultiplexMultiFrameDeliveriesOnSingleSession() {
- doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(true);
+ public void testMultiplexMultiFrameDeliveriesOnSingleSessionOutgoing() {
+ doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(true);
}
- private void doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(boolean bothDeliveriesMultiFrame) {
+ private void doMultiplexMultiFrameDeliveryOnSingleSessionOutgoingTestImpl(boolean bothDeliveriesMultiFrame) {
MockTransportImpl transport = new MockTransportImpl();
transport.setEmitFlowEventOnSend(false);
@@ -2788,4 +2812,575 @@
assertEquals("Unexpected more flag", false, transfer.getMore());
}
}
+
+ @Test
+ public void testMultiplexMultiFrameDeliveriesOnSingleSessionIncoming() {
+ doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(true);
+ }
+
+ @Test
+ public void testMultiplexMultiFrameDeliveryOnSingleSessionIncoming() {
+ doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(false);
+ }
+
+ private void doMultiplexMultiFrameDeliveryOnSingleSessionIncomingTestImpl(boolean bothDeliveriesMultiFrame) {
+ int contentLength1 = 7000;
+ int maxPayloadChunkSize = 2000;
+ int contentLength2 = 1000;
+ if(bothDeliveriesMultiFrame) {
+ contentLength2 = 4100;
+ }
+
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName1 = "myReceiver1";
+ Receiver receiver1 = session.receiver(linkName1);
+ receiver1.flow(5);
+ receiver1.open();
+
+ String linkName2 = "myReceiver2";
+ Receiver receiver2 = session.receiver(linkName2);
+ receiver2.flow(5);
+ receiver2.open();
+
+ pumpMockTransport(transport);
+
+ final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+ final UnsignedInteger r2handle = UnsignedInteger.ONE;
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+ assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+ FrameBody frame = transport.writes.get(2);
+ assertTrue("Unexpected frame type", frame instanceof Attach);
+ assertEquals("Unexpected handle", ((Attach) frame).getHandle(), r1handle);
+ frame = transport.writes.get(3);
+ assertTrue("Unexpected frame type", frame instanceof Attach);
+ assertEquals("Unexpected handle", ((Attach) frame).getHandle(), r2handle);
+ frame = transport.writes.get(4);
+ assertTrue("Unexpected frame type", frame instanceof Flow);
+ assertEquals("Unexpected handle", ((Flow) frame).getHandle(), r1handle);
+ frame = transport.writes.get(5);
+ assertTrue("Unexpected frame type", frame instanceof Flow);
+ assertEquals("Unexpected handle", ((Flow) frame).getHandle(), r2handle);
+
+ assertNull("Should not yet have a delivery", receiver1.current());
+ assertNull("Should not yet have a delivery", receiver2.current());
+
+ // Send the necessary responses to open/begin/attach
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach1 = new Attach();
+ attach1.setHandle(r1handle);
+ attach1.setRole(Role.SENDER);
+ attach1.setName(linkName1);
+ attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach1, null));
+
+ Attach attach2 = new Attach();
+ attach2.setHandle(r2handle);
+ attach2.setRole(Role.SENDER);
+ attach2.setName(linkName2);
+ attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach2, null));
+
+ String deliveryTag1 = "tag1";
+ String messageContent1 = createLargeContent(contentLength1);
+ String deliveryTag2 = "tag2";
+ String messageContent2 = createLargeContent(contentLength2);
+
+ ArrayList<byte[]> message1chunks = createTransferPayloads(messageContent1, maxPayloadChunkSize);
+ assertEquals("unexpected number of payload chunks", 4, message1chunks.size());
+ ArrayList<byte[]> message2chunks = createTransferPayloads(messageContent2, maxPayloadChunkSize);
+ if(bothDeliveriesMultiFrame) {
+ assertEquals("unexpected number of payload chunks", 3, message2chunks.size());
+ } else {
+ assertEquals("unexpected number of payload chunks", 1, message2chunks.size());
+ }
+
+ while (true) {
+ if (!message1chunks.isEmpty()) {
+ byte[] chunk = message1chunks.remove(0);
+ handlePartialTransfer(transport, r1handle, 1, deliveryTag1, chunk, !message1chunks.isEmpty());
+ }
+
+ if (!message2chunks.isEmpty()) {
+ byte[] chunk = message2chunks.remove(0);
+ handlePartialTransfer(transport, r2handle, 2, deliveryTag2, chunk, !message2chunks.isEmpty());
+ }
+
+ if (message1chunks.isEmpty() && message2chunks.isEmpty()) {
+ break;
+ }
+ }
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+ assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+ Delivery delivery1 = verifyDelivery(receiver1, deliveryTag1, messageContent1);
+ assertNotNull("Should now have a delivery", delivery1);
+ assertEquals("Unexpected queued count", 0, receiver1.getQueued());
+
+ assertEquals("Unexpected queued count", 1, receiver2.getQueued());
+ Delivery delivery2 = verifyDelivery(receiver2, deliveryTag2, messageContent2);
+ assertNotNull("Should now have a delivery", delivery2);
+ assertEquals("Unexpected queued count", 0, receiver2.getQueued());
+
+ delivery1.disposition(Accepted.getInstance());
+ delivery1.settle();
+ pumpMockTransport(transport);
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 7, transport.writes.size());
+
+ frame = transport.writes.get(6);
+ assertTrue("Unexpected frame type", frame instanceof Disposition);
+ assertEquals("Unexpected delivery id", ((Disposition) frame).getFirst(), UnsignedInteger.ONE);
+ assertEquals("Unexpected delivery id", ((Disposition) frame).getLast(), UnsignedInteger.ONE);
+
+ delivery2.disposition(Accepted.getInstance());
+ delivery2.settle();
+ pumpMockTransport(transport);
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 8, transport.writes.size());
+
+ frame = transport.writes.get(7);
+ assertTrue("Unexpected frame type", frame instanceof Disposition);
+ assertEquals("Unexpected delivery id", ((Disposition) frame).getFirst(), UnsignedInteger.valueOf(2));
+ assertEquals("Unexpected delivery id", ((Disposition) frame).getLast(), UnsignedInteger.valueOf(2));
+ }
+
+ private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, int deliveryId, String deliveryTag, byte[] partialPayload, boolean more)
+ {
+ handlePartialTransfer(transport, handle, UnsignedInteger.valueOf(deliveryId), deliveryTag, partialPayload, more);
+ }
+
+ private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more)
+ {
+ handlePartialTransfer(transport, handle, deliveryId, deliveryTag, partialPayload, more, false);
+ }
+
+ private void handlePartialTransfer(TransportImpl transport, UnsignedInteger handle, UnsignedInteger deliveryId, String deliveryTag, byte[] partialPayload, boolean more, boolean aborted)
+ {
+ byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
+
+ Transfer transfer = new Transfer();
+ transfer.setHandle(handle);
+ transfer.setDeliveryTag(new Binary(tag));
+ transfer.setMessageFormat(UnsignedInteger.valueOf(DeliveryImpl.DEFAULT_MESSAGE_FORMAT));
+ transfer.setMore(more);
+ transfer.setAborted(aborted);
+ if(deliveryId != null) {
+ // Can be omitted in continuation frames for a given delivery.
+ transfer.setDeliveryId(deliveryId);
+ }
+
+ transport.handleFrame(new TransportFrame(0, transfer, new Binary(partialPayload, 0, partialPayload.length)));
+ }
+
+ private ArrayList<byte[]> createTransferPayloads(String content, int payloadChunkSize)
+ {
+ ArrayList<byte[]> payloadChunks = new ArrayList<>();
+
+ Message m = Message.Factory.create();
+ m.setBody(new AmqpValue(content));
+
+ byte[] encoded = new byte[BUFFER_SIZE];
+ int len = m.encode(encoded, 0, BUFFER_SIZE);
+ assertTrue("given array was too small", len < BUFFER_SIZE);
+
+ int copied = 0;
+ while(copied < len) {
+ int chunkSize = Math.min(len - copied, payloadChunkSize);
+ byte[] chunk = new byte[chunkSize];
+
+ System.arraycopy(encoded, copied, chunk, 0, chunkSize);
+
+ payloadChunks.add(chunk);
+ copied += chunkSize;
+ }
+
+ assertFalse("no payload chunks to return", payloadChunks.isEmpty());
+
+ return payloadChunks;
+ }
+
+ @Test
+ public void testDeliveryIdOutOfSequenceCausesISE() {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName1 = "myReceiver1";
+ Receiver receiver1 = session.receiver(linkName1);
+ receiver1.flow(5);
+ receiver1.open();
+
+ String linkName2 = "myReceiver2";
+ Receiver receiver2 = session.receiver(linkName2);
+ receiver2.flow(5);
+ receiver2.open();
+
+ pumpMockTransport(transport);
+
+ final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+ final UnsignedInteger r2handle = UnsignedInteger.ONE;
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+ // Give the necessary responses to open/begin/attach
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach1 = new Attach();
+ attach1.setHandle(r1handle);
+ attach1.setRole(Role.SENDER);
+ attach1.setName(linkName1);
+ attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach1, null));
+
+ Attach attach2 = new Attach();
+ attach2.setHandle(r2handle);
+ attach2.setRole(Role.SENDER);
+ attach2.setName(linkName2);
+ attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach2, null));
+
+ String deliveryTag1 = "tag1";
+ String deliveryTag2 = "tag2";
+
+ handlePartialTransfer(transport, r2handle, 2, deliveryTag2, new byte[]{ 2 }, false);
+ try {
+ handlePartialTransfer(transport, r1handle, 1, deliveryTag1, new byte[]{ 1 }, false);
+ fail("Expected an ISE");
+ } catch(IllegalStateException ise) {
+ // Expected
+ assertTrue("Unexpected exception:" + ise, ise.getMessage().contains("Expected delivery-id 3, got 1"));
+ }
+ }
+
+ @Test
+ public void testDeliveryIdMissingOnInitialTransferCausesISE() {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName1 = "myReceiver1";
+ Receiver receiver1 = session.receiver(linkName1);
+ receiver1.flow(5);
+ receiver1.open();
+
+ pumpMockTransport(transport);
+
+ final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+ // Give the necessary responses to open/begin/attach
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach1 = new Attach();
+ attach1.setHandle(r1handle);
+ attach1.setRole(Role.SENDER);
+ attach1.setName(linkName1);
+ attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach1, null));
+
+ // Receive a delivery without any delivery-id on the [first] transfer frame, expect it to fail.
+ try {
+ handlePartialTransfer(transport, r1handle, null, "tag1", new byte[]{ 1 }, false);
+ fail("Expected an ISE");
+ } catch(IllegalStateException ise) {
+ // Expected
+ assertEquals("Unexpected message", "No delivery-id specified on first Transfer of new delivery", ise.getMessage());
+ }
+ }
+
+ @Test
+ public void testMultiplexDeliveriesOnSameReceiverLinkCausesISE() {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName1 = "myReceiver1";
+ Receiver receiver1 = session.receiver(linkName1);
+ receiver1.flow(5);
+ receiver1.open();
+
+ pumpMockTransport(transport);
+
+ final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+ // Give the necessary responses to open/begin/attach
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach1 = new Attach();
+ attach1.setHandle(r1handle);
+ attach1.setRole(Role.SENDER);
+ attach1.setName(linkName1);
+ attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach1, null));
+
+ // Receive first transfer for a multi-frame delivery
+ handlePartialTransfer(transport, r1handle, 1, "tag1", new byte[]{ 1 }, true);
+
+ // Receive first transfer for ANOTHER multi-frame delivery, expect it to fail
+ // as multiplexing deliveries on a single link is forbidden by the spec.
+ try {
+ handlePartialTransfer(transport, r1handle, 2, "tag2", new byte[]{ 2 }, true);
+ fail("Expected an ISE");
+ } catch(IllegalStateException ise) {
+ // Expected
+ assertEquals("Unexpected message", "Illegal multiplex of deliveries on same link with delivery-id 1 and 2", ise.getMessage());
+ }
+ }
+
+ @Test
+ public void testDeliveryIdTrackingHandlesAbortedDelivery() {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName1 = "myReceiver1";
+ Receiver receiver1 = session.receiver(linkName1);
+ receiver1.flow(5);
+ receiver1.open();
+
+ pumpMockTransport(transport);
+
+ final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+ // Give the necessary responses to open/begin/attach
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach1 = new Attach();
+ attach1.setHandle(r1handle);
+ attach1.setRole(Role.SENDER);
+ attach1.setName(linkName1);
+ attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach1, null));
+
+ // Receive first transfer for a multi-frame delivery
+ assertEquals("Unexpected queued count", 0, receiver1.getQueued());
+ handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, "tag1", new byte[]{ 1 }, true);
+ assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+ // Receive second transfer for a multi-frame delivery, indicating it is aborted
+ handlePartialTransfer(transport, r1handle, UnsignedInteger.ZERO, "tag1", new byte[]{ 2 }, true, true);
+ assertEquals("Unexpected queued count", 1, receiver1.getQueued());
+
+ // Receive first transfer for ANOTHER delivery, expect it not to fail, since the earlier delivery aborted
+ handlePartialTransfer(transport, r1handle, UnsignedInteger.ONE, "tag2", new byte[]{ 3 }, false);
+ assertEquals("Unexpected queued count", 2, receiver1.getQueued());
+
+ receiver1.advance();
+ verifyDeliveryRawPayload(receiver1, "tag2", new byte[] { 3 });
+ }
+
+ @Test
+ public void testDeliveryWithIdOmittedOnContinuationTransfers() {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName1 = "myReceiver1";
+ Receiver receiver1 = session.receiver(linkName1);
+ receiver1.flow(5);
+ receiver1.open();
+
+ String linkName2 = "myReceiver2";
+ Receiver receiver2 = session.receiver(linkName2);
+ receiver2.flow(5);
+ receiver2.open();
+
+ pumpMockTransport(transport);
+
+ final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+ final UnsignedInteger r2handle = UnsignedInteger.ONE;
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 6, transport.writes.size());
+
+ // Give the necessary responses to open/begin/attach
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach1 = new Attach();
+ attach1.setHandle(r1handle);
+ attach1.setRole(Role.SENDER);
+ attach1.setName(linkName1);
+ attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach1, null));
+
+ Attach attach2 = new Attach();
+ attach2.setHandle(r2handle);
+ attach2.setRole(Role.SENDER);
+ attach2.setName(linkName2);
+ attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach2, null));
+
+ String deliveryTag1 = "tag1";
+ String deliveryTag2 = "tag2";
+
+ // Send multi-frame deliveries for each link, multiplexed together, and omit
+ // the delivery-id on the continuation frames as allowed for by the spec.
+ handlePartialTransfer(transport, r1handle, 1, deliveryTag1, new byte[]{ 1 }, true);
+ handlePartialTransfer(transport, r2handle, 2, deliveryTag2, new byte[]{ 101 }, true);
+ handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 102 }, true);
+ handlePartialTransfer(transport, r1handle, null, deliveryTag1, new byte[]{ 2 }, true);
+ handlePartialTransfer(transport, r1handle, null, deliveryTag1, new byte[]{ 3 }, false);
+ handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 103 }, true);
+ handlePartialTransfer(transport, r2handle, null, deliveryTag2, new byte[]{ 104 }, false);
+
+ // Verify the transfer frames were all matched to compose the expected delivery payload.
+ verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1, 2, 3 });
+ verifyDeliveryRawPayload(receiver2, deliveryTag2, new byte[] { 101, 102, 103, 104 });
+ }
+
+ @Test
+ public void testDeliveryIdThresholdsAndWraps() {
+ // Check start from 0
+ doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.ZERO, UnsignedInteger.ONE, UnsignedInteger.valueOf(2));
+ // Check run up to max-int (interesting boundary for underlying impl)
+ doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(Integer.MAX_VALUE - 2), UnsignedInteger.valueOf(Integer.MAX_VALUE -1), UnsignedInteger.valueOf(Integer.MAX_VALUE));
+ // Check crossing from signed range value into unsigned range value (interesting boundary for underlying impl)
+ long maxIntAsLong = Integer.MAX_VALUE;
+ doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(maxIntAsLong), UnsignedInteger.valueOf(maxIntAsLong + 1L), UnsignedInteger.valueOf(maxIntAsLong + 2L));
+ // Check run up to max-uint
+ doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.valueOf(0xFFFFFFFFL - 2), UnsignedInteger.valueOf(0xFFFFFFFFL - 1), UnsignedInteger.MAX_VALUE);
+ // Check wrapping from max unsigned value back to min(/0).
+ doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger.MAX_VALUE, UnsignedInteger.ZERO, UnsignedInteger.ONE);
+ }
+
+ private void doDeliveryIdThresholdsWrapsTestImpl(UnsignedInteger deliveryId1, UnsignedInteger deliveryId2, UnsignedInteger deliveryId3) {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ connection.open();
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName1 = "myReceiver1";
+ Receiver receiver1 = session.receiver(linkName1);
+ receiver1.flow(5);
+ receiver1.open();
+
+ pumpMockTransport(transport);
+
+ final UnsignedInteger r1handle = UnsignedInteger.ZERO;
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+ // Give the necessary responses to open/begin/attach
+ transport.handleFrame(new TransportFrame(0, new Open(), null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ begin.setNextOutgoingId(UnsignedInteger.ONE);
+ begin.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ begin.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach1 = new Attach();
+ attach1.setHandle(r1handle);
+ attach1.setRole(Role.SENDER);
+ attach1.setName(linkName1);
+ attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ transport.handleFrame(new TransportFrame(0, attach1, null));
+
+ String deliveryTag1 = "tag1";
+ String deliveryTag2 = "tag2";
+ String deliveryTag3 = "tag3";
+
+ // Send deliveries with the given delivery-id
+ handlePartialTransfer(transport, r1handle, deliveryId1, deliveryTag1, new byte[]{ 1 }, false);
+ handlePartialTransfer(transport, r1handle, deliveryId2, deliveryTag2, new byte[]{ 2 }, false);
+ handlePartialTransfer(transport, r1handle, deliveryId3, deliveryTag3, new byte[]{ 3 }, false);
+
+ // Verify deliveries arrived with expected payload
+ verifyDeliveryRawPayload(receiver1, deliveryTag1, new byte[] { 1 });
+ verifyDeliveryRawPayload(receiver1, deliveryTag2, new byte[] { 2 });
+ verifyDeliveryRawPayload(receiver1, deliveryTag3, new byte[] { 3 });
+ }
}