QPID-8345: [Broker-J][AMQP 1.0] Dequeue messages sent non-transactionally as pre-settled
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index bc87141..5f9283e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -240,9 +240,14 @@
if (_linkEndpoint.isAttached())
{
- if (SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
+ boolean sendPreSettled = SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode());
+ if (sendPreSettled)
{
transfer.setSettled(true);
+ if (_acquires && _transactionId == null)
+ {
+ transfer.setState(new Accepted());
+ }
}
else
{
@@ -302,6 +307,11 @@
}
getSession().getAMQPConnection().registerMessageDelivered(message.getSize());
getEndpoint().transfer(transfer, false);
+
+ if (sendPreSettled && _acquires && _transactionId == null)
+ {
+ handleAcquiredEntrySentPareSettledNonTransactional(entry, consumer);
+ }
}
else
{
@@ -319,6 +329,35 @@
}
}
+ private void handleAcquiredEntrySentPareSettledNonTransactional(final MessageInstance entry,
+ final MessageInstanceConsumer consumer)
+ {
+ if (entry.makeAcquisitionUnstealable(consumer))
+ {
+ final ServerTransaction txn = _linkEndpoint.getAsyncAutoCommitTransaction();
+ txn.dequeue(entry.getEnqueueRecord(),
+ new ServerTransaction.Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ @Override
+ public void onRollback()
+ {
+ entry.release(consumer);
+ }
+ });
+ txn.commit();
+ }
+ else
+ {
+ entry.release(consumer);
+ }
+ }
+
@Override
public void flushBatched()
{
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 53a8af6..94a1249 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -737,7 +737,6 @@
.begin().consumeResponse()
.attachRole(Role.RECEIVER)
.attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attachRcvSettleMode(ReceiverSettleMode.FIRST)
.attachSndSettleMode(SenderSettleMode.SETTLED)
.attach().consumeResponse(Attach.class);
Attach attach = interaction.getLatestResponse(Attach.class);
@@ -760,6 +759,13 @@
// verify no unexpected performative received by closing the connection
interaction.doCloseConnection();
}
+
+ if (getBrokerAdmin().isQueueDepthSupported())
+ {
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "test");
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo("test")));
}
@Test