PROTON-1892: ensure new deliveryId is properly allocated to new delivery transfers when an existing multi-frame delivery is still underway on another link.
Change from Marcel Meulemans, plus test from me.
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index afadb5f..3ec6ef4 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -556,8 +556,15 @@
}
}
- UnsignedInteger deliveryId = tpSession.getOutgoingDeliveryId();
- TransportDelivery tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink);
+ TransportDelivery tpDelivery = delivery.getTransportDelivery();
+ UnsignedInteger deliveryId;
+ if (tpDelivery != null) {
+ deliveryId = tpDelivery.getDeliveryId();
+ } else {
+ deliveryId = tpSession.getOutgoingDeliveryId();
+ tpSession.incrementOutgoingDeliveryId();
+ }
+ tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink);
delivery.setTransportDelivery(tpDelivery);
final Transfer transfer = new Transfer();
@@ -616,7 +623,6 @@
delivery.setDone();
tpLink.setDeliveryCount(tpLink.getDeliveryCount().add(UnsignedInteger.ONE));
tpLink.setLinkCredit(tpLink.getLinkCredit().subtract(UnsignedInteger.ONE));
- tpSession.incrementOutgoingDeliveryId();
session.incrementOutgoingDeliveries(-1);
snd.decrementQueued();
}
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 50c04fd..9e3afd6 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -2643,4 +2643,149 @@
return new String(payload, StandardCharsets.UTF_8);
}
+
+ @Test
+ public void testMultiplexMultiFrameDeliveryOnSingleSession() {
+ doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(false);
+ }
+
+ @Test
+ public void testMultiplexMultiFrameDeliveriesOnSingleSession() {
+ doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(true);
+ }
+
+ private void doMultiplexMultiFrameDeliveryOnSingleSessionTestImpl(boolean bothDeliveriesMultiFrame) {
+ MockTransportImpl transport = new MockTransportImpl();
+ transport.setEmitFlowEventOnSend(false);
+
+ int contentLength1 = 6000;
+ int frameSizeLimit = 4000;
+ int contentLength2 = 2000;
+ if(bothDeliveriesMultiFrame) {
+ contentLength2 = 6000;
+ }
+
+ Connection connection = Proton.connection();
+ transport.bind(connection);
+
+ Session session = connection.session();
+ session.open();
+
+ String linkName = "mySender1";
+ Sender sender = session.sender(linkName);
+ sender.open();
+
+ String linkName2 = "mySender2";
+ Sender sender2 = session.sender(linkName2);
+ sender2.open();
+
+ String messageContent1 = createLargeContent(contentLength1);
+ sendMessage(sender, "tag1", messageContent1);
+
+ String messageContent2 = createLargeContent(contentLength2);
+ sendMessage(sender2, "tag2", messageContent2);
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 0, transport.writes.size());
+
+ // Now open the connection, expect the Open Begin, and Attach frames but
+ // nothing else as we haven't remotely opened the receiver or given credit yet.
+ connection.open();
+
+ pumpMockTransport(transport);
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+ assertTrue("Unexpected frame type", transport.writes.get(0) instanceof Open);
+ assertTrue("Unexpected frame type", transport.writes.get(1) instanceof Begin);
+ assertTrue("Unexpected frame type", transport.writes.get(2) instanceof Attach);
+ assertTrue("Unexpected frame type", transport.writes.get(3) instanceof Attach);
+
+ // Send the necessary responses to open/begin/attach then give senders credit
+ Open open = new Open();
+ open.setMaxFrameSize(UnsignedInteger.valueOf(frameSizeLimit));
+
+ transport.handleFrame(new TransportFrame(0, open, null));
+
+ Begin begin = new Begin();
+ begin.setRemoteChannel(UnsignedShort.valueOf((short) 0));
+ transport.handleFrame(new TransportFrame(0, begin, null));
+
+ Attach attach1 = new Attach();
+ attach1.setHandle(UnsignedInteger.ZERO);
+ attach1.setRole(Role.RECEIVER);
+ attach1.setName(linkName);
+ attach1.setInitialDeliveryCount(UnsignedInteger.ZERO);
+
+ transport.handleFrame(new TransportFrame(0, attach1, null));
+
+ Attach attach2 = new Attach();
+ attach2.setHandle(UnsignedInteger.ONE);
+ attach2.setRole(Role.RECEIVER);
+ attach2.setName(linkName2);
+ attach2.setInitialDeliveryCount(UnsignedInteger.ZERO);
+
+ transport.handleFrame(new TransportFrame(0, attach2, null));
+
+ Flow flow1 = new Flow();
+ flow1.setHandle(UnsignedInteger.ZERO);
+ flow1.setDeliveryCount(UnsignedInteger.ZERO);
+ flow1.setNextIncomingId(UnsignedInteger.ONE);
+ flow1.setNextOutgoingId(UnsignedInteger.ZERO);
+ flow1.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ flow1.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ flow1.setLinkCredit(UnsignedInteger.valueOf(10));
+
+ transport.handleFrame(new TransportFrame(0, flow1, null));
+
+ Flow flow2 = new Flow();
+ flow2.setHandle(UnsignedInteger.ONE);
+ flow2.setDeliveryCount(UnsignedInteger.ZERO);
+ flow2.setNextIncomingId(UnsignedInteger.ONE);
+ flow2.setNextOutgoingId(UnsignedInteger.ZERO);
+ flow2.setIncomingWindow(UnsignedInteger.valueOf(1024));
+ flow2.setOutgoingWindow(UnsignedInteger.valueOf(1024));
+ flow2.setLinkCredit(UnsignedInteger.valueOf(10));
+
+ transport.handleFrame(new TransportFrame(0, flow2, null));
+
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), 4, transport.writes.size());
+
+ // Now pump the transport again and expect transfers for the messages
+ pumpMockTransport(transport);
+
+ int expectedFrames = bothDeliveriesMultiFrame ? 8 : 7;
+ assertEquals("Unexpected frames written: " + getFrameTypesWritten(transport), expectedFrames, transport.writes.size());
+
+ FrameBody frameBody = transport.writes.get(4);
+ assertTrue("Unexpected frame type", frameBody instanceof Transfer);
+ Transfer transfer = (Transfer) frameBody;
+ assertEquals("Unexpected delivery tag", new Binary("tag1".getBytes(StandardCharsets.UTF_8)), transfer.getDeliveryTag());
+ assertEquals("Unexpected deliveryId", UnsignedInteger.ZERO, transfer.getDeliveryId());
+ assertEquals("Unexpected more flag", true, transfer.getMore());
+
+ frameBody = transport.writes.get(5);
+ assertTrue("Unexpected frame type", frameBody instanceof Transfer);
+ transfer = (Transfer) frameBody;
+ assertEquals("Unexpected delivery tag", new Binary("tag2".getBytes(StandardCharsets.UTF_8)), transfer.getDeliveryTag());
+ assertEquals("Unexpected deliveryId", UnsignedInteger.ONE, transfer.getDeliveryId());
+ assertEquals("Unexpected more flag", bothDeliveriesMultiFrame, transfer.getMore());
+
+ frameBody = transport.writes.get(6);
+ assertTrue("Unexpected frame type", frameBody instanceof Transfer);
+ transfer = (Transfer) frameBody;
+ assertEquals("Unexpected delivery tag", new Binary("tag1".getBytes(StandardCharsets.UTF_8)), transfer.getDeliveryTag());
+ assertEquals("Unexpected deliveryId", UnsignedInteger.ZERO, transfer.getDeliveryId());
+ assertEquals("Unexpected more flag", false, transfer.getMore());
+
+ if(bothDeliveriesMultiFrame) {
+ frameBody = transport.writes.get(7);
+ assertTrue("Unexpected frame type", frameBody instanceof Transfer);
+ transfer = (Transfer) frameBody;
+ assertEquals("Unexpected delivery tag", new Binary("tag2".getBytes(StandardCharsets.UTF_8)), transfer.getDeliveryTag());
+ assertEquals("Unexpected deliveryId", UnsignedInteger.ONE, transfer.getDeliveryId());
+ assertEquals("Unexpected more flag", false, transfer.getMore());
+ }
+ }
}