blob: 6cd4bcbe1bed7eb9e85bc2048cf419e39fd59f8f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.ResourceAllocationException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsSendTimedOutException;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.jms.util.QpidJMSTestRunner;
import org.apache.qpid.jms.util.Repeat;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for the Anonymous Fallback producer implementation.
*
* DO NOT add capability to indicate server support for ANONYMOUS-RELAY for any of these tests
*/
@RunWith(QpidJMSTestRunner.class)
public class AnonymousFallbackProducerIntegrationTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(AnonymousFallbackProducerIntegrationTest.class);
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
@Test(timeout = 20000)
public void testCloseSenderWithNoActiveFallbackProducers() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
testPeer.expectBegin();
testPeer.expectClose();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(null);
producer.close();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseProducerDuringSyncSendNoCache() throws Exception {
doTestRemotelyCloseProducerDuringSyncSend(0);
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseProducerDuringSyncSendOneCached() throws Exception {
doTestRemotelyCloseProducerDuringSyncSend(1);
}
private void doTestRemotelyCloseProducerDuringSyncSend(int cacheSize) throws Exception {
final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Use a long timeout to ensure no early evictions in this test.
Connection connection = testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000");
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Expect producer creation, give it credit.
testPeer.expectSenderAttach();
String text = "myMessage";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
// Expect a message to be sent, but don't send a disposition in
// response, simply remotely close the producer instead.
testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false);
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);
testPeer.expectClose();
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(null);
Message message = session.createTextMessage(text);
try {
producer.send(queue, message);
fail("Expected exception to be thrown");
} catch (JMSException jmse) {
LOG.trace("JMSException thrown from send: ", jmse);
// Expected but requires some context to be correct.
assertTrue(jmse instanceof ResourceAllocationException);
assertNotNull("Expected exception to have a message", jmse.getMessage());
assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
}
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseProducerWithSendWaitingForCreditNoCache() throws Exception {
doTestRemotelyCloseProducerWithSendWaitingForCredit(0);
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseProducerWithSendWaitingForCreditOneCached() throws Exception {
doTestRemotelyCloseProducerWithSendWaitingForCredit(1);
}
private void doTestRemotelyCloseProducerWithSendWaitingForCredit(int cacheSize) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Use a long timeout to ensure no early evictions in this test.
Connection connection = testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000");
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Expect producer creation, don't give it credit.
testPeer.expectSenderAttachWithoutGrantingCredit();
// Producer has no credit so the send should block waiting for it, then fail when the remote close occurs
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, "Producer closed", 50);
testPeer.expectClose();
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(null);
Message message = session.createTextMessage("myMessage");
try {
producer.send(queue, message);
fail("Expected exception to be thrown due to close of producer");
} catch (ResourceAllocationException rae) {
// Expected if remote close beat the send to the provider
} catch (IllegalStateException ise) {
// Can happen if send fires before remote close if processed.
}
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseConnectionDuringSyncSendNoCache() throws Exception {
doTestRemotelyCloseConnectionDuringSyncSend(0);
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseConnectionDuringSyncSendOneCached() throws Exception {
doTestRemotelyCloseConnectionDuringSyncSend(1);
}
private void doTestRemotelyCloseConnectionDuringSyncSend(int cacheSize) throws Exception {
final String BREAD_CRUMB = "ErrorMessageBreadCrumb";
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Use a long timeout to ensure no early evictions in this test.
Connection connection = testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000");
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Expect producer creation, give it credit.
testPeer.expectSenderAttach();
String text = "myMessage";
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
messageMatcher.setPropertiesMatcher(propsMatcher);
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
// Expect a message to be sent, but don't send a disposition in
// response, simply remotely close the connection instead.
testPeer.expectTransfer(messageMatcher, nullValue(), false, false, null, false);
testPeer.remotelyCloseConnection(true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB);
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(null);
Message message = session.createTextMessage(text);
try {
producer.send(queue, message);
fail("Expected exception to be thrown");
} catch (JMSException jmse) {
// Expected exception with specific context
assertTrue(jmse instanceof ResourceAllocationException);
assertNotNull("Expected exception to have a message", jmse.getMessage());
assertTrue("Expected breadcrumb to be present in message", jmse.getMessage().contains(BREAD_CRUMB));
}
testPeer.waitForAllHandlersToComplete(3000);
connection.close();
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testSendWhenLinkCreditIsZeroAndTimeoutNoCache() throws Exception {
doTestSendWhenLinkCreditIsZeroAndTimeout(0);
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testSendWhenLinkCreditIsZeroAndTimeoutCacheOne() throws Exception {
doTestSendWhenLinkCreditIsZeroAndTimeout(1);
}
private void doTestSendWhenLinkCreditIsZeroAndTimeout(int cacheSize) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Use a long timeout to ensure no early evictions in this test.
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + cacheSize + "&amqp.anonymousFallbackCacheTimeout=60000");
connection.setSendTimeout(500);
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = "myQueue";
Queue queue = session.createQueue(queueName);
Message message = session.createTextMessage("text");
// Expect the producer to attach. Don't send any credit so that the client will
// block on a send and we can test our timeouts.
testPeer.expectSenderAttachWithoutGrantingCredit();
if (cacheSize == 0) {
testPeer.expectDetach(true, true, true);
}
testPeer.expectClose();
MessageProducer producer = session.createProducer(null);
try {
producer.send(queue, message);
fail("Send should time out.");
} catch (JmsSendTimedOutException jmsEx) {
LOG.info("Caught expected error: {}", jmsEx.getMessage());
} catch (Throwable error) {
fail("Send should time out, but got: " + error.getMessage());
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testSyncSendFailureHandled() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
// Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination, then closing the link after the message is sent.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(topicName));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected(), true);
testPeer.expectDetach(true, true, true);
Message message = session.createMessage();
try {
producer.send(dest, message);
fail("Send should fail");
} catch (JMSException jmsEx) {
LOG.debug("Caught expected error from failed send.");
}
// Repeat the send and observe another attach->transfer->detach.
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
producer.send(dest, message);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testAsyncSendFailureHandled() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final CountDownLatch sendFailureReportedToListener = new CountDownLatch(1);
final AtomicReference<Throwable> sendFailureError = new AtomicReference<>();
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?jms.forceAsyncSend=true&amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
connection.setExceptionListener((error) -> {
sendFailureError.compareAndSet(null, error);
sendFailureReportedToListener.countDown();
});
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
//Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination, then closing the link after the message is sent.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(topicName));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
final String BREAD_CRUMB = "SEND FAILURE EXPECTED";
org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
rejectError.setDescription(BREAD_CRUMB);
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected().setError(rejectError), true);
testPeer.expectDetach(true, true, true);
// Producer should act as synchronous regardless of asynchronous send setting.
Message message = session.createMessage();
try {
producer.send(dest, message);
} catch (JMSException jmsEx) {
LOG.debug("Caught expected error from failed send.");
fail("Send should not fail as it should have fired asynchronously");
}
// Repeat the send and observe another attach->transfer->detach.
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
assertTrue("Send failure not reported to exception handler", sendFailureReportedToListener.await(5, TimeUnit.SECONDS));
assertNotNull(sendFailureError.get());
assertTrue(sendFailureError.get() instanceof ResourceAllocationException);
assertTrue(sendFailureError.get().getMessage().contains(BREAD_CRUMB));
producer.send(dest, message);
// Send here is asynchronous so we need to wait for disposition to arrive and detach to happen
testPeer.waitForAllHandlersToComplete(1000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testAsyncCompletionListenerSendFailureHandled() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
//Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination, then closing the link after the message is sent.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(topicName));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
String content = "testContent";
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content));
TestJmsCompletionListener completionListener = new TestJmsCompletionListener();
Message message = session.createTextMessage(content);
final String BREAD_CRUMB = "SEND FAILURE EXPECTED";
org.apache.qpid.jms.test.testpeer.describedtypes.Error rejectError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
rejectError.setCondition(AmqpError.RESOURCE_LIMIT_EXCEEDED);
rejectError.setDescription(BREAD_CRUMB);
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher, nullValue(), new Rejected().setError(rejectError), true);
testPeer.expectDetach(true, true, true);
// The fallback producer acts as synchronous regardless of the completion listener,
// so exceptions are thrown from send. Only onComplete uses the listener.
try {
producer.send(dest, message, completionListener);
} catch (JMSException jmsEx) {
LOG.debug("Caught unexpected error from failed send.");
fail("Send should not fail for asychrnous completion sends");
}
// Repeat the send (but accept this time) and observe another attach->transfer->detach.
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
assertTrue("Send failure not reported to exception handler", completionListener.awaitCompletion(5, TimeUnit.SECONDS));
assertNotNull(completionListener.exception);
assertTrue(completionListener.exception instanceof ResourceAllocationException);
assertTrue(completionListener.exception.getMessage().contains(BREAD_CRUMB));
TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener();
producer.send(dest, message, completionListener2);
assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
assertNull(completionListener2.exception);
Message receivedMessage2 = completionListener2.message;
assertNotNull(receivedMessage2);
assertTrue(receivedMessage2 instanceof TextMessage);
assertEquals(content, ((TextMessage) receivedMessage2).getText());
// Asynchronous send requires a wait otherwise we can close before the detach which we are testing for.
testPeer.waitForAllHandlersToComplete(1000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testAsyncCompletionListenerSendWhenNoCacheConfigured() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
//Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination, then closing the link after the message is sent.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(topicName));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
String content = "testContent";
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content));
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
TestJmsCompletionListener completionListener = new TestJmsCompletionListener();
Message message = session.createTextMessage(content);
producer.send(dest, message, completionListener);
assertTrue("Did not get completion callback", completionListener.awaitCompletion(5, TimeUnit.SECONDS));
assertNull(completionListener.exception);
Message receivedMessage = completionListener.message;
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof TextMessage);
assertEquals(content, ((TextMessage) receivedMessage).getText());
// Repeat the send and observe another attach->transfer->detach.
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
TestJmsCompletionListener completionListener2 = new TestJmsCompletionListener();
producer.send(dest, message, completionListener2);
assertTrue("Did not get completion callback", completionListener2.awaitCompletion(5, TimeUnit.SECONDS));
assertNull(completionListener2.exception);
Message receivedMessage2 = completionListener2.message;
assertNotNull(receivedMessage2);
assertTrue(receivedMessage2 instanceof TextMessage);
assertEquals(content, ((TextMessage) receivedMessage2).getText());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyEndFallbackProducerCompletesAsyncSends() throws Exception {
final String BREAD_CRUMB = "ErrorMessage";
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final CountDownLatch producerClosed = new CountDownLatch(1);
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onProducerClosed(MessageProducer producer, Throwable exception) {
producerClosed.countDown();
}
});
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a producer, then remotely end the session afterwards.
testPeer.expectSenderAttach();
Queue queue = session.createQueue("myQueue");
final MessageProducer producer = session.createProducer(queue);
final int MSG_COUNT = 3;
for (int i = 0; i < MSG_COUNT; ++i) {
testPeer.expectTransferButDoNotRespond(new TransferPayloadCompositeMatcher());
}
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_LIMIT_EXCEEDED, BREAD_CRUMB, 50);
TestJmsCompletionListener listener = new TestJmsCompletionListener(MSG_COUNT);
try {
for (int i = 0; i < MSG_COUNT; ++i) {
Message message = session.createTextMessage("content");
producer.send(message, listener);
}
} catch (JMSException e) {
LOG.warn("Caught unexpected error: {}", e.getMessage());
fail("No expected exception for this send.");
}
testPeer.waitForAllHandlersToComplete(2000);
// Verify the sends gets marked as having failed
assertTrue(listener.awaitCompletion(5, TimeUnit.SECONDS));
assertEquals(MSG_COUNT, listener.errorCount);
// Verify the producer gets marked closed
assertTrue("Producer closed callback didn't trigger", producerClosed.await(5, TimeUnit.SECONDS));
try {
producer.getDeliveryMode();
fail("Expected ISE to be thrown due to being closed");
} catch (IllegalStateException jmsise) {
String errorMessage = jmsise.getCause().getMessage();
assertTrue(errorMessage.contains(AmqpError.RESOURCE_LIMIT_EXCEEDED.toString()));
assertTrue(errorMessage.contains(BREAD_CRUMB));
}
// Try closing it explicitly, should effectively no-op in client.
// The test peer will throw during close if it sends anything.
producer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseSessionAndAttemptAsyncCompletionSendThrowsAndLeavesMessageReadable() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=0&amqp.anonymousFallbackCacheTimeout=60000");
final CountDownLatch sessionClosed = new CountDownLatch(1);
connection.addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onSessionClosed(Session session, Throwable cause) {
sessionClosed.countDown();
}
});
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
MessageProducer producer = session.createProducer(null);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination, then closing the link after the message is sent.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo("myQueue"));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
String content = "testContent";
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(new MessageHeaderSectionMatcher(true));
messageMatcher.setMessageAnnotationsMatcher(new MessageAnnotationsSectionMatcher(true));
messageMatcher.setPropertiesMatcher(new MessagePropertiesSectionMatcher(true));
messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(content));
// Perform a send and observe an attach->transfer->detach.
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
testPeer.remotelyEndLastOpenedSession(true);
Message message1 = session.createTextMessage(content);
Message message2 = session.createTextMessage(content);
assertNull("Should not yet have a JMSDestination", message1.getJMSDestination());
assertNull("Should not yet have a JMSDestination", message2.getJMSDestination());
producer.send(queue, message1);
testPeer.waitForAllHandlersToComplete(2000);
assertTrue("Session should have been closed", sessionClosed.await(2, TimeUnit.SECONDS));
TestJmsCompletionListener listener = new TestJmsCompletionListener();
try {
producer.send(queue, message2, listener);
fail("Expected exception to be thrown for this send.");
} catch (JMSException e) {
LOG.trace("Caught expected exception: {}", e.getMessage());
}
assertFalse("Should not get async callback", listener.awaitCompletion(5, TimeUnit.MILLISECONDS));
// Message should be readable but not carry a destination as it wasn't actually sent anywhere
assertNull("Should not have a readable JMSDestination", message2.getJMSDestination());
assertEquals("Message body not as expected", content, ((TextMessage) message2).getText());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(2000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testFallbackProducerRecoversFromRefusalOfSenderOpenOnNextSend() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=1&amqp.anonymousFallbackCacheTimeout=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
// Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(topicName));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
Topic dest = session.createTopic(topicName);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
Message message = session.createMessage();
// Refuse the attach which should result in fallback producer detaching the refused
// link attach and the send should then fail.
testPeer.expectSenderAttach(targetMatcher, true, false);
testPeer.expectDetach(true, false, false);
try {
producer.send(dest, message);
fail("Send should have failed because sender link was refused.");
} catch (JMSException ex) {
LOG.trace("Caught expected exception: ", ex);
}
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
producer.send(dest, message);
producer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 60000)
public void testRepeatedSendToSameAddressWhenCacheSizeOfOneKeepsFallbackProducerInCache() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int MESSAGE_COUNT = 25;
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=1&amqp.anonymousFallbackCacheTimeout=200");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
// Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(topicName));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
testPeer.expectSenderAttach(targetMatcher, false, true);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
Topic dest = session.createTopic(topicName);
Message message = session.createMessage();
// Setup our expectations
for (int i = 1; i <= MESSAGE_COUNT; ++i) {
testPeer.expectTransfer(messageMatcher);
}
testPeer.expectDetach(true, true, true);
// First round of sends should open and cache sender links
for (int i = 1; i <= MESSAGE_COUNT; ++i) {
producer.send(dest, message);
}
LOG.debug("Finished with send cycle, producer should now timeout");
// The eviction timer should reduce the cache to zero after we go idle
testPeer.waitForAllHandlersToComplete(3000);
producer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testSendToMultipleDestinationsOpensNewSendersWhenCaching() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int CACHE_SIZE = 5;
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + "&amqp.anonymousFallbackCacheTimeout=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
// Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
// First round of sends should open and cache sender links
for (int i = 1; i <= CACHE_SIZE; ++i) {
Topic dest = session.createTopic(topicName + i);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(dest.getTopicName()));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
Message message = session.createMessage();
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
producer.send(dest, message);
}
// This round of sends should reuse the cached links
for (int i = 1; i <= CACHE_SIZE; ++i) {
Topic dest = session.createTopic(topicName + i);
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(dest.getTopicName()));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
Message message = session.createMessage();
testPeer.expectTransfer(messageMatcher);
producer.send(dest, message);
}
// Cached senders should all close when the producer is closed.
for (int i = 1; i <= CACHE_SIZE; ++i) {
testPeer.expectDetach(true, true, true);
}
producer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 30000)
public void testCachedFallbackProducersAreTimedOut() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int CACHE_SIZE = 5;
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + "&amqp.anonymousFallbackCacheTimeout=300");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
// Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
// First round of sends should open and cache sender links
for (int i = 1; i <= CACHE_SIZE; ++i) {
Topic dest = session.createTopic(topicName + i);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(dest.getTopicName()));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
Message message = session.createMessage();
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
producer.send(dest, message);
}
// Cached senders should all close when the cache timeout is reached and they are expired
for (int i = 1; i <= CACHE_SIZE; ++i) {
testPeer.expectDetach(true, true, true);
}
// On a slow CI machine we could fail here due to the timeouts not having run.
testPeer.waitForAllHandlersToComplete(6000);
producer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testCachedFallbackProducerEvictedBySendToUncachedAddress() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int CACHE_SIZE = 2;
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + "&amqp.anonymousFallbackCacheTimeout=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
// Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
// First round of sends should open and cache sender links
for (int i = 1; i <= CACHE_SIZE; ++i) {
Topic dest = session.createTopic(topicName + i);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(dest.getTopicName()));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
Message message = session.createMessage();
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
producer.send(dest, message);
}
// Second round using different addresses for the sends should evict old links and open new ones
for (int i = 1; i <= CACHE_SIZE; ++i) {
Topic dest = session.createTopic(topicName + UUID.randomUUID().toString());
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(dest.getTopicName()));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
Message message = session.createMessage();
testPeer.expectDetach(true, true, true);
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
producer.send(dest, message);
}
// The current cached senders should all close when the producer is closed.
for (int i = 1; i <= CACHE_SIZE; ++i) {
testPeer.expectDetach(true, true, true);
}
producer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testCachedFallbackProducerEvictedBySendToUncachedAddressHandlesDelayedResponse() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=1&amqp.anonymousFallbackCacheTimeout=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
// Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
Topic dest1 = session.createTopic(topicName + 1);
Topic dest2 = session.createTopic(topicName + 2);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(dest1.getTopicName()));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
Message message = session.createMessage();
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
producer.send(dest1, message);
// Expect new send to a different destination to detach the previous cached link
// and once the response arrives the send should complete normally.
targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(dest2.getTopicName()));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
message = session.createMessage();
testPeer.expectDetach(true, false, false);
// Workaround to allow a deferred detach at a later time.
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(false, true, AmqpError.RESOURCE_DELETED, "error", 20);
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.expectDetach(true, true, true);
producer.send(dest2, message);
producer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testRemotelyCloseCachedFallbackProducerFreesSlotInCache() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int CACHE_SIZE = 3;
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + "&amqp.anonymousFallbackCacheTimeout=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
// Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
// First round of sends should open and cache sender links
for (int i = 1; i < CACHE_SIZE; ++i) {
Topic dest = session.createTopic(topicName + i);
// Expect a new message sent to the above created destination to result in a new
// sender link attached to the given destination.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(dest.getTopicName()));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
Message message = session.createMessage();
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
producer.send(dest, message);
}
Topic dest = session.createTopic(topicName + CACHE_SIZE);
// Expect a new message sent to the above created destination to result in a new
// sender link attached to the given destination.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(dest.getTopicName()));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
Message message = session.createMessage();
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true);
producer.send(dest, message);
// Must ensure that the next send only fires after the remote detach has occurred otherwise
// it will definitely evict another producer from the cache.
testPeer.waitForAllHandlersToComplete(1000);
dest = session.createTopic(topicName + UUID.randomUUID().toString());
// Expect a new message sent to the above created destination to result in a new
// sender link attached to the given destination. Existing cached producers should
// remain in the cache as a slot should now be open.
targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(dest.getTopicName()));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
message = session.createMessage();
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
producer.send(dest, message);
// The current cached senders should all close when the producer is closed.
for (int i = 1; i <= CACHE_SIZE; ++i) {
testPeer.expectDetach(true, true, true);
}
producer.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
@Repeat(repetitions = 1)
@Test(timeout = 20000)
public void testFailureOfOneCacheProducerCloseOnPropagatedToMainProducerClose() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
final int CACHE_SIZE = 3;
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer,
"?amqp.anonymousFallbackCacheSize=" + CACHE_SIZE + "&amqp.anonymousFallbackCacheTimeout=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
// Expect no AMQP traffic when we create the anonymous producer, as it will wait
// for an actual send to occur on the producer before anything occurs on the wire
// Create an anonymous producer
MessageProducer producer = session.createProducer(null);
assertNotNull("Producer object was null", producer);
MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
messageMatcher.setHeadersMatcher(headersMatcher);
messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
// First round of sends should open and cache sender links
for (int i = 1; i <= CACHE_SIZE; ++i) {
Topic dest = session.createTopic(topicName + i);
// Expect a new message sent by the above producer to cause creation of a new
// sender link to the given destination.
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(equalTo(dest.getTopicName()));
targetMatcher.withDynamic(equalTo(false));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
Message message = session.createMessage();
testPeer.expectSenderAttach(targetMatcher, false, false);
testPeer.expectTransfer(messageMatcher);
producer.send(dest, message);
}
// The current cached senders should all close when the producer is closed.
for (int i = 1; i < CACHE_SIZE; ++i) {
testPeer.expectDetach(true, true, true);
}
// Last one carries error but since we asked for close it should be ignored as we got what we wanted.
testPeer.expectDetach(true, true, true, AmqpError.RESOURCE_LOCKED, "Some error on detach");
try {
producer.close();
} catch (JMSException ex) {
LOG.trace("Caught unexpected error: ", ex);
fail("Should not have thrown an error as close was requested so errors are ignored.");
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
private class TestJmsCompletionListener implements CompletionListener {
private final CountDownLatch completed;
@SuppressWarnings("unused")
public volatile int successCount;
public volatile int errorCount;
public volatile Message message;
public volatile Exception exception;
public TestJmsCompletionListener() {
this(1);
}
public TestJmsCompletionListener(int expected) {
this.completed = new CountDownLatch(expected);
}
public boolean awaitCompletion(long timeout, TimeUnit units) throws InterruptedException {
return completed.await(timeout, units);
}
@Override
public void onCompletion(Message message) {
LOG.info("JmsCompletionListener onCompletion called with message: {}", message);
this.message = message;
this.successCount++;
completed.countDown();
}
@Override
public void onException(Message message, Exception exception) {
LOG.info("JmsCompletionListener onException called with message: {} error {}", message, exception);
this.message = message;
this.exception = exception;
this.errorCount++;
completed.countDown();
}
}
}