QPIDJMS-473: avoid passing conflicting sync + completion-required flags to the anonymous fallback producer
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
index 807ce4b..63e5cbb 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java
@@ -59,10 +59,6 @@
public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws ProviderException {
LOG.trace("Started send chain for anonymous producer: {}", getProducerId());
- // Force sends marked as asynchronous to be sent synchronous so that the temporary
- // producer instance can handle failures and perform necessary completion work on
- // the send.
- envelope.setSendAsync(false);
// Create a new ProducerInfo for the short lived producer that's created to perform the
// send to the given AMQP target.
@@ -74,7 +70,12 @@
// it will trigger the open event which will in turn trigger the send event.
// The created producer will be closed immediately after the delivery has been acknowledged.
AmqpProducerBuilder builder = new AmqpProducerBuilder(session, info);
- builder.buildResource(new AnonymousSendRequest(request, builder, envelope));
+ builder.buildResource(new AnonymousSendRequest(request, builder, envelope, envelope.isCompletionRequired()));
+
+ // Force sends to be sent synchronous so that the temporary producer instance can handle
+ // the failures and perform necessary completion work on the send.
+ envelope.setSendAsync(false);
+ envelope.setCompletionRequired(false);
getParent().getProvider().pumpToProtonTransport(request);
}
@@ -108,10 +109,12 @@
private abstract class AnonymousRequest extends WrappedAsyncResult {
protected final JmsOutboundMessageDispatch envelope;
+ private final boolean completionRequired;
- public AnonymousRequest(AsyncResult sendResult, JmsOutboundMessageDispatch envelope) {
+ public AnonymousRequest(AsyncResult sendResult, JmsOutboundMessageDispatch envelope, boolean completionRequired) {
super(sendResult);
this.envelope = envelope;
+ this.completionRequired = completionRequired;
}
/**
@@ -124,6 +127,10 @@
super.onFailure(result);
}
+ public boolean isCompletionRequired() {
+ return completionRequired;
+ }
+
public abstract AmqpProducer getProducer();
}
@@ -131,8 +138,8 @@
private final AmqpProducerBuilder producerBuilder;
- public AnonymousSendRequest(AsyncResult sendResult, AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope) {
- super(sendResult, envelope);
+ public AnonymousSendRequest(AsyncResult sendResult, AmqpProducerBuilder producerBuilder, JmsOutboundMessageDispatch envelope, boolean completionRequired) {
+ super(sendResult, envelope, completionRequired);
this.producerBuilder = producerBuilder;
}
@@ -159,7 +166,7 @@
private final AmqpProducer producer;
public AnonymousSendCompleteRequest(AnonymousSendRequest open) {
- super(open.getWrappedRequest(), open.envelope);
+ super(open.getWrappedRequest(), open.envelope, open.isCompletionRequired());
this.producer = open.getProducer();
}
@@ -190,7 +197,7 @@
private final AmqpProducer producer;
public AnonymousCloseRequest(AnonymousSendCompleteRequest sendComplete) {
- super(sendComplete.getWrappedRequest(), sendComplete.envelope);
+ super(sendComplete.getWrappedRequest(), sendComplete.envelope, sendComplete.isCompletionRequired());
this.producer = sendComplete.getProducer();
}
@@ -199,6 +206,9 @@
public void onSuccess() {
LOG.trace("Close phase of anonymous send complete: {} ", getProducerId());
super.onSuccess();
+ if (isCompletionRequired()) {
+ getParent().getProvider().getProviderListener().onCompletedMessageSend(envelope);
+ }
}
@Override
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 3c36512..0e5ce7d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -2841,6 +2841,158 @@
}
@Test(timeout = 20000)
+ public void testAnonymousProducerAsyncCompletionListenerSendFailureHandledWhenAnonymousRelayNodeIsNotSupported() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+ // DO NOT add capability to indicate server support for ANONYMOUS-RELAY
+
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ connection.start();
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String topicName = "myTopic";
+ Topic dest = session.createTopic(topicName);
+
+ // Expect no AMQP traffic when we create the anonymous producer, as it will wait
+ // for an actual send to occur on the producer before anything occurs on the wire
+
+ //Create an anonymous producer
+ MessageProducer producer = session.createProducer(null);
+ assertNotNull("Producer object was null", producer);
+
+ // Expect a new message sent by the above producer to cause creation of a new
+ // sender link to the given destination, then closing the link after the message is sent.
+ TargetMatcher targetMatcher = new TargetMatcher();
+ targetMatcher.withAddress(equalTo(topicName));
+ targetMatcher.withDynamic(equalTo(false));
+ targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+ String content = "testContent";
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+ messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content));
+
+ TestJmsCompletionListener completionListener = new TestJmsCompletionListener();
+ Message message = session.createTextMessage(content);
+
+ testPeer.expectSenderAttach(targetMatcher, false, false);
+ testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected(), true);
+ testPeer.expectDetach(true, true, true);
+
+ // The fallback producer acts as synchronous regardless of the completion listener,
+ // so exceptions are thrown from send. Only onComplete uses the listener.
+ try {
+ producer.send(dest, message, completionListener);
+ fail("Send should fail");
+ } catch (JMSException jmsEx) {
+ LOG.debug("Caught expected error from failed send.");
+ }
+
+ //Repeat the send (but accept this time) and observe another attach->transfer->detach.
+ testPeer.expectSenderAttach(targetMatcher, false, false);
+ testPeer.expectTransfer(messageMatcher);
+ testPeer.expectDetach(true, true, true);
+
+ TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener();
+
+ producer.send(dest, message, completionListener2);
+
+ assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
+ assertNull(completionListener2.exception);
+ Message receivedMessage2 = completionListener2.message;
+ assertNotNull(receivedMessage2);
+ assertTrue(receivedMessage2 instanceof TextMessage);
+ assertEquals(content, ((TextMessage) receivedMessage2).getText());
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testAnonymousProducerAsyncCompletionListenerSendWhenAnonymousRelayNodeIsNotSupported() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+ // DO NOT add capability to indicate server support for ANONYMOUS-RELAY
+
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ connection.start();
+
+ testPeer.expectBegin();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String topicName = "myTopic";
+ Topic dest = session.createTopic(topicName);
+
+ // Expect no AMQP traffic when we create the anonymous producer, as it will wait
+ // for an actual send to occur on the producer before anything occurs on the wire
+
+ //Create an anonymous producer
+ MessageProducer producer = session.createProducer(null);
+ assertNotNull("Producer object was null", producer);
+
+ // Expect a new message sent by the above producer to cause creation of a new
+ // sender link to the given destination, then closing the link after the message is sent.
+ TargetMatcher targetMatcher = new TargetMatcher();
+ targetMatcher.withAddress(equalTo(topicName));
+ targetMatcher.withDynamic(equalTo(false));
+ targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
+
+ String content = "testContent";
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
+ messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
+ messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
+ messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content));
+
+ testPeer.expectSenderAttach(targetMatcher, false, false);
+ testPeer.expectTransfer(messageMatcher);
+ testPeer.expectDetach(true, true, true);
+
+ TestJmsCompletionListener completionListener = new TestJmsCompletionListener();
+ Message message = session.createTextMessage(content);
+
+ producer.send(dest, message, completionListener);
+
+ assertTrue("Did not get completion callback", completionListener.awaitCompletion(5, TimeUnit.SECONDS));
+ assertNull(completionListener.exception);
+ Message receivedMessage = completionListener.message;
+ assertNotNull(receivedMessage);
+ assertTrue(receivedMessage instanceof TextMessage);
+ assertEquals(content, ((TextMessage) receivedMessage).getText());
+
+ //Repeat the send and observe another attach->transfer->detach.
+ testPeer.expectSenderAttach(targetMatcher, false, false);
+ testPeer.expectTransfer(messageMatcher);
+ testPeer.expectDetach(true, true, true);
+
+ TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener();
+
+ producer.send(dest, message, completionListener2);
+
+ assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
+ assertNull(completionListener2.exception);
+ Message receivedMessage2 = completionListener2.message;
+ assertNotNull(receivedMessage2);
+ assertTrue(receivedMessage2 instanceof TextMessage);
+ assertEquals(content, ((TextMessage) receivedMessage2).getText());
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testSendingMessageSetsJMSDeliveryTimeWithDelay() throws Exception {
doSendingMessageSetsJMSDeliveryTimeTestImpl(true);
}