QPID-8496: [Broker-J] Improve handling of transfers without payload
* Allow incoming delivery transfers without a payload
* Report amqp:not-implemented for a delivery when total payload size in all delivery transfers is 0
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
index 0f8e893..9bd89cb 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
@@ -189,7 +189,11 @@
List<QpidByteBuffer> transferBuffers = new ArrayList<>(_transfers.size());
for (Transfer t : _transfers)
{
- transferBuffers.add(t.getPayload());
+ QpidByteBuffer payload = t.getPayload();
+ if (payload != null)
+ {
+ transferBuffers.add(payload);
+ }
t.dispose();
}
_transfers.clear();
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index d86f338..9724850 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -181,6 +181,10 @@
MessageFormat format = MessageFormatRegistry.getFormat(messageFormat.intValue());
if(format != null)
{
+ if (delivery.getTotalPayloadSize() == 0)
+ {
+ return new Error(AmqpError.NOT_IMPLEMENTED, "Delivery without payload is not supported");
+ }
try (QpidByteBuffer payload = delivery.getPayload())
{
serverMessage = format.createMessage(payload,
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
index 0df1abd..bbb02e1 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/MessageDecoder.java
@@ -73,7 +73,11 @@
{
throw new IllegalStateException("The section fragments have already been parsed");
}
- _fragments.add(transfer.getPayload());
+ QpidByteBuffer payload = transfer.getPayload();
+ if (payload != null)
+ {
+ _fragments.add(payload);
+ }
}
public void parse() throws AmqpErrorException
@@ -145,10 +149,6 @@
}
while (s instanceof AmqpSequenceSection);
}
- else
- {
- throw new IllegalStateException("Application data sections are not found");
- }
if (s instanceof FooterSection)
{
@@ -170,6 +170,10 @@
{
parse();
+ if (_dataSections.size() == 0)
+ {
+ return null;
+ }
Object bodyObject = null;
EncodingRetainingSection<?> firstBodySection = _dataSections.get(0);
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index 5557eb8..4aaf132 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -107,6 +107,53 @@
}
@Test
+ @SpecificationTest(section = "", description = "")
+ public void multiTransferMessageHavingTransfersWithoutPayload() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
+ {
+ QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 1);
+
+ final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
+ final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
+
+ Interaction interaction = transport.newInteraction();
+ Disposition disposition = interaction.negotiateOpen()
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+ .transferPayload(payloads[0])
+ .transferDeliveryId(deliveryId)
+ .transferDeliveryTag(deliveryTag)
+ .transferMore(true)
+ .transfer()
+
+ .transferMore(true)
+ .transferPayload(null)
+ .transfer()
+
+ .transferMore(false)
+ .transferPayload(null)
+ .transfer()
+ .consume(Disposition.class, Flow.class);
+
+ for (final QpidByteBuffer payload : payloads)
+ {
+ payload.dispose();
+ }
+
+ interaction.detachEndCloseUnconditionally();
+
+ assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
+ assertThat(disposition.getLast(), oneOf(null, deliveryId));
+ assertThat(disposition.getSettled(), is(equalTo(true)));
+ }
+ assertThat(Utils.receiveMessage(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(getTestName())));
+ }
+
+ @Test
@SpecificationTest(section = "2.7.5",
description = "[delivery-id] On continuation transfers the delivery-id MAY be omitted..."
+ "[delivery-tag] field MUST be specified for the first transfer of a multi-transfer"
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 480263a..e2eaf1f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -225,6 +225,39 @@
}
@Test
+ public void transferWithoutPayload() throws Exception
+ {
+ final String firstMessage = getTestName() + "_1";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, firstMessage);
+ try (FrameTransport transport = new FrameTransport(getBrokerAdmin()).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateOpen()
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachHandle(UnsignedInteger.ONE)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+ .assertLatestResponse(Flow.class, this::assumeSufficientCredits)
+ .transferDeliveryId()
+ .transferHandle(UnsignedInteger.ONE)
+ .transfer()
+ .consumeResponse();
+
+ final Response<?> response = interaction.getLatestResponse();
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getBody(), is(notNullValue()));
+ assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+
+ final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), equalTo(AmqpError.NOT_IMPLEMENTED));
+ }
+ assertThat(Utils.receiveMessage(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(firstMessage)));
+ }
+
+ @Test
@SpecificationTest(section = "2.6.12 Transferring A Message",
description = "The delivery-tag MUST be unique amongst all deliveries"
+ " that could be considered unsettled by either end of the link.")