blob: db9dd9da924134ffed2543db0de832caf9e2901c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.jms.integration;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SUB_NAME_DELIMITER;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.proton.amqp.Symbol;
import org.hamcrest.Matcher;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SubscriptionsIntegrationTest extends QpidJmsTestCase {
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
private static final Logger LOG = LoggerFactory.getLogger(SubscriptionsIntegrationTest.class);
// -------------------------------------- //
@Test
public void testConstants() throws Exception {
assertEquals(Symbol.valueOf("SHARED-SUBS"), AmqpSupport.SHARED_SUBS);
assertEquals("|", AmqpSupport.SUB_NAME_DELIMITER);
assertEquals(Symbol.valueOf("shared"), AmqpSupport.SHARED);
assertEquals(Symbol.valueOf("global"), AmqpSupport.GLOBAL);
}
/**
* Verifies that a shared durable subscriber detaches links with closed = false.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testCloseSharedDurableTopicSubscriberDetachesWithCloseFalse() throws Exception {
doSharedTopicSubscriberDetachTestImpl(true);
}
/**
* Verifies that a shared volatile subscriber detaches links with closed = true.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testCloseSharedVolatileTopicSubscriberDetachesWithCloseTrue() throws Exception {
doSharedTopicSubscriberDetachTestImpl(false);
}
private void doSharedTopicSubscriberDetachTestImpl(boolean durable) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
if (durable) {
Matcher<?> linkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
} else {
Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
}
testPeer.expectLinkFlow();
MessageConsumer subscriber;
if (durable) {
subscriber = session.createSharedDurableConsumer(dest, subscriptionName);
} else {
subscriber = session.createSharedConsumer(dest, subscriptionName);
}
testPeer.expectDetach(!durable, true, !durable);
subscriber.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
// -------------------------------------- //
/**
* Verifies that a shared durable subscribers name their links such that the first link is the
* bare subscription name, and subsquent links use a counter suffix to ensure they are unique.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedDurableSubscriberLinkNames() throws Exception {
doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(true, true);
}
/**
* Verifies that a shared volatile subscribers names their links such that the subscription name
* is suffixed with a counter to ensure they are unique.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedVolatileSubscriberLinkNamesHaveUniqueCounterSuffix() throws Exception {
doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(false, true);
}
/**
* Verifies that on a connection without a ClientID, shared durable subscribers name their links
* such that the first link is the subscription name with 'global' suffix, and subsequent links
* additionally append a counter suffix to ensure they are unique.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedDurableSubscriberLinkNamesNoClientID() throws Exception {
doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(true, false);
}
/**
* Verifies that on a connection without a ClientID, shared volatile subscribers names their links
* such that the subscription name is suffixed with a 'global' qualifier an counter to ensure
* they are unique.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedVolatileSubscriberLinkNamesHaveUniqueCounterSuffixNoClientID() throws Exception {
doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(false, false);
}
private void doSharedSubsriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable, boolean useClientID) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
Connection connection;
if(useClientID) {
connection = testFixture.establishConnecton(testPeer, serverCapabilities);
} else {
connection = testFixture.establishConnectonWithoutClientID(testPeer, serverCapabilities);
}
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
// Attach the first shared receiver
if (durable) {
String linkName = subscriptionName;
if(!useClientID) {
linkName += SUB_NAME_DELIMITER + "global";
}
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
} else {
String linkName;
if(useClientID) {
linkName = subscriptionName + SUB_NAME_DELIMITER + "volatile1";
} else {
linkName = subscriptionName + SUB_NAME_DELIMITER + "global-volatile1";
}
testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
}
testPeer.expectLinkFlow();
if (durable) {
session.createSharedDurableConsumer(dest, subscriptionName);
} else {
session.createSharedConsumer(dest, subscriptionName);
}
// Attach the second shared receiver, expect the link name to differ
if (durable) {
String linkName = subscriptionName;
if(useClientID) {
linkName += SUB_NAME_DELIMITER + "2";
} else {
linkName += SUB_NAME_DELIMITER + "global2";
}
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
} else {
String linkName;
if(useClientID) {
linkName = subscriptionName + SUB_NAME_DELIMITER + "volatile2";
} else {
linkName = subscriptionName + SUB_NAME_DELIMITER + "global-volatile2";
}
testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, equalTo(linkName), useClientID);
}
testPeer.expectLinkFlow();
if (durable) {
session.createSharedDurableConsumer(dest, subscriptionName);
} else {
session.createSharedConsumer(dest, subscriptionName);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
// -------------------------------------- //
/**
* Verifies that a shared durable subscribers names their links on a per-connection basis, such that
* suffix counter on one connection is not impacted by subscriptions on another connection.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedDurableSubsriberLinkNamesMultipleConnectionSubs() throws Exception {
doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true);
}
/**
* Verifies that a shared volatile subscribers names their links on a per-connection basis, such that
* suffix counter on one connection is not impacted by subscriptions on another connection.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleConnectionSubs() throws Exception {
doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false);
}
private void doMultipleConnectionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception {
try (TestAmqpPeer peer1 = new TestAmqpPeer();
TestAmqpPeer peer2 = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
// Establish first connection
Connection connection1 = testFixture.establishConnecton(peer1, serverCapabilities);
connection1.start();
peer1.expectBegin();
Session sessionConn1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Establish second connection
Connection connection2 = testFixture.establishConnecton(peer2, serverCapabilities);
connection2.start();
peer2.expectBegin();
Session sessionConn2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = sessionConn1.createTopic(topicName);
String subscriptionName = "mySubscription";
Matcher<?> durSubLinkNameMatcher = equalTo(subscriptionName);
Matcher<?> volatileSubLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
// Attach the first connections shared receiver
if (durable) {
peer1.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durSubLinkNameMatcher, true);
} else {
peer1.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileSubLinkNameMatcher, true);
}
peer1.expectLinkFlow();
if (durable) {
sessionConn1.createSharedDurableConsumer(dest, subscriptionName);
} else {
sessionConn1.createSharedConsumer(dest, subscriptionName);
}
// Attach the second connections shared receiver, expect the link name to be the same since its per-connection
if (durable) {
peer2.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durSubLinkNameMatcher, true);
} else {
peer2.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileSubLinkNameMatcher, true);
}
peer2.expectLinkFlow();
if (durable) {
sessionConn2.createSharedDurableConsumer(dest, subscriptionName);
} else {
sessionConn2.createSharedConsumer(dest, subscriptionName);
}
peer1.expectClose();
connection1.close();
peer2.expectClose();
connection2.close();
peer1.waitForAllHandlersToComplete(1000);
peer2.waitForAllHandlersToComplete(1000);
}
}
// -------------------------------------- //
/**
* Verifies that a shared durable subscribers names their links on a per-connection basis, such that
* suffix counter for one session is impacted by subscriptions on another session on the same connection.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedDurableSubsriberLinkNamesMultipleSessionSubs() throws Exception {
doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true);
}
/**
* Verifies that a shared volatile subscribers names their links on a per-connection basis, such that
* suffix counter for one session is impacted by subscriptions on another session on the same connection.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleSessionSubs() throws Exception {
doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false);
}
private void doMultipleSessionSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
// Establish connection
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
connection.start();
testPeer.expectBegin();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
testPeer.expectBegin();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session1.createTopic(topicName);
String subscriptionName = "mySubscription";
// Attach the first sessions shared receiver
if (durable) {
Matcher<?> linkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
} else {
Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
}
testPeer.expectLinkFlow();
if (durable) {
session1.createSharedDurableConsumer(dest, subscriptionName);
} else {
session1.createSharedConsumer(dest, subscriptionName);
}
// Attach the second sessions shared receiver, expect the link name to be different since its per-connection
if (durable) {
Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
} else {
Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile2");
testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
}
testPeer.expectLinkFlow();
if (durable) {
session2.createSharedDurableConsumer(dest, subscriptionName);
} else {
session2.createSharedConsumer(dest, subscriptionName);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
// -------------------------------------- //
/**
* Verifies that shared durable subscribers name their links on a per-subscription name basis, such that
* suffix counter for one subscription nameis not impacted by those for another subscription on the same connection.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedDurableSubsriberLinkNamesMultipleNamedSubs() throws Exception {
doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(true);
}
/**
* Verifies that shared volatile subscribers name their links on a per-subscription name basis, such that
* suffix counter for one subscription name is not impacted by those for another subscription on the same connection.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedVolatileSubsriberLinkNamesHaveUniqueCounterSuffixMultipleNamedSubs() throws Exception {
doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(false);
}
private void doMultipleNamedSharedSubscriberLinkNamesHaveUniqueCounterSuffixTestImpl(boolean durable) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName1 = "mySubscription1";
String subscriptionName2 = "mySubscription2";
// Attach the first subscriptions shared receivers
if (durable) {
Matcher<?> linkNameMatcher = equalTo(subscriptionName1);
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
testPeer.expectLinkFlow();
linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "2");
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
} else {
Matcher<?> linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "volatile1");
testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
testPeer.expectLinkFlow();
linkNameMatcher = equalTo(subscriptionName1 + SUB_NAME_DELIMITER + "volatile2");
testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName1, linkNameMatcher, true);
}
testPeer.expectLinkFlow();
if (durable) {
session.createSharedDurableConsumer(dest, subscriptionName1);
session.createSharedDurableConsumer(dest, subscriptionName1);
} else {
session.createSharedConsumer(dest, subscriptionName1);
session.createSharedConsumer(dest, subscriptionName1);
}
// Attach the first subscriptions shared receivers
if (durable) {
Matcher<?> linkNameMatcher = equalTo(subscriptionName2);
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
testPeer.expectLinkFlow();
linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "2");
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
} else {
Matcher<?> linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "volatile1");
testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
testPeer.expectLinkFlow();
linkNameMatcher = equalTo(subscriptionName2 + SUB_NAME_DELIMITER + "volatile2");
testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName2, linkNameMatcher, true);
}
testPeer.expectLinkFlow();
if (durable) {
session.createSharedDurableConsumer(dest, subscriptionName2);
session.createSharedDurableConsumer(dest, subscriptionName2);
} else {
session.createSharedConsumer(dest, subscriptionName2);
session.createSharedConsumer(dest, subscriptionName2);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
// -------------------------------------- //
/**
* Verifies that a shared volatile subscriber and shared durable subscriber with the same subscription name
* can be active on the same connection at the same time and names their links appropriately to distinguish
* themselves from each other.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedDurableAndVolatileSubsCoexistUsingDistinctLinkNames() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
// Establish connection
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
// Attach the durable shared receiver
Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
testPeer.expectLinkFlow();
session.createSharedDurableConsumer(dest, subscriptionName);
// Attach the volatile shared receiver
Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, volatileLinkNameMatcher, true);
testPeer.expectLinkFlow();
session.createSharedConsumer(dest, subscriptionName);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
// -------------------------------------- //
/**
* Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
* attempt to create a shared durable subscriber fails if the link also doesn't identify as
* supporting shared subscriptions.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testCreateSharedDurableTopicSubscriberFailsIfNotSupported() throws Exception {
doSharedSubscriptionNotSupportedTestImpl(true, false);
}
/**
* Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
* attempt to create a shared volatile subscriber fails if the link also doesn't identify as
* supporting shared subscriptions.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testCreateSharedVolatileTopicSubscriberFailsIfNotSupported() throws Exception {
doSharedSubscriptionNotSupportedTestImpl(false, false);
}
/**
* Verifies that when a durable shared subscriber creation fails because the connection and link
* both lack the capability indicating support, the subscriber is appropriately removed from
* record, releasing use of the link name such that attempting another subscriber creation
* uses the same initial link name.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testCreateSharedDurableTopicSubscriberFailsIfNotSupportedReleasesLinkName() throws Exception {
doSharedSubscriptionNotSupportedTestImpl(true, true);
}
/**
* Verifies that when a volatile shared subscriber creation fails because the connection and link
* both lack the capability indicating support, the subscriber is appropriately removed from
* record, releasing use of the link name such that attempting another subscriber creation
* uses the same initial link name.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testCreateSharedVolatileTopicSubscriberFailsIfNotSupportedReleasesLinkName() throws Exception {
doSharedSubscriptionNotSupportedTestImpl(false, true);
}
private void doSharedSubscriptionNotSupportedTestImpl(boolean durable, boolean repeat) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT include server connection capability to indicate support for shared-subs
// This will cause the link capability to be desired, and we verify failure if not offered.
Symbol[] serverCapabilities = new Symbol[]{};
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
int iterations = repeat ? 2 : 1;
// Expect a shared receiver to attach, then detach due to the link also not
// reporting it offers the shared subs capability, i.e sharing not supported.
for (int i = 0; i < iterations; i++) {
try {
if (durable) {
Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true, false, true, true, false);
testPeer.expectDetach(false, true, false);
session.createSharedDurableConsumer(dest, subscriptionName);
} else {
Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, volatileLinkNameMatcher, false, false, true, true, false);
testPeer.expectDetach(true, true, true);
session.createSharedConsumer(dest, subscriptionName);
}
fail("Expected an exception to be thrown");
} catch (JMSException jmse) {
// expected
}
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
/**
* Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
* attempt to create a shared durable subscriber succeeds if the server offers link capability for
* shared subscriptions.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testCreateSharedDurableTopicSubscriberSucceedsWithOnlyLinkCapability() throws Exception {
doSharedSubscriptionLinkCapabilitySupportedTestImpl(true);
}
/**
* Verifies that on a connection which doesn't identify as supporting shared subscriptions, the
* attempt to create a shared volatile subscriber succeeds if the server offers link capability for
* shared subscriptions.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testCreateSharedVolatileTopicSubscriberSucceedsWithOnlyLinkCapability() throws Exception {
doSharedSubscriptionLinkCapabilitySupportedTestImpl(false);
}
private void doSharedSubscriptionLinkCapabilitySupportedTestImpl(boolean durable) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// DONT include server connection capability to indicate support for shared-subs.
// This will cause the link capability to be desired, and we verify success if offered.
Symbol[] serverCapabilities = new Symbol[]{};
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
// Expect a shared receiver to attach, and succeed due to the server offering
// the shared subs capability, i.e sharing is supported.
if (durable) {
Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true, false, true, true, true);
testPeer.expectLinkFlow();
session.createSharedDurableConsumer(dest, subscriptionName);
} else {
Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
testPeer.expectSharedSubscriberAttach(topicName, subscriptionName, volatileLinkNameMatcher, false, false, true, true, true);
testPeer.expectLinkFlow();
session.createSharedConsumer(dest, subscriptionName);
}
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
// -------------------------------------- //
/**
* Verifies that a shared durable subscriber and exclusive durable subscriber with the same subscription name
* can't be active on the same connection at the same time (shared first, then exclusive).
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedAndExclusiveDurableSubsCantCoexistSharedFirst() throws Exception {
doSharedAndExclusiveDurableSubsCantCoexistTestImpl(true);
}
/**
* Verifies that a shared durable subscriber and exclusive durable subscriber with the same subscription name
* can't be active on the same connection at the same time (exclusive first, then shared).
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedAndExclusiveDurableSubsCantCoexistExclusiveFirst() throws Exception {
doSharedAndExclusiveDurableSubsCantCoexistTestImpl(false);
}
private void doSharedAndExclusiveDurableSubsCantCoexistTestImpl(boolean sharedFirst) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
// Establish connection
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
if(sharedFirst) {
// Attach the durable shared receiver
Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
testPeer.expectLinkFlow();
session.createSharedDurableConsumer(dest, subscriptionName);
} else {
// Attach the durable exclusive receiver
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlow();
session.createDurableConsumer(dest, subscriptionName);
}
testPeer.expectClose();
try {
if (sharedFirst) {
// Now try to attach a durable exclusive receiver
session.createDurableConsumer(dest, subscriptionName);
} else {
// Now try to attach a durable shared receiver
session.createSharedDurableConsumer(dest, subscriptionName);
}
fail("Expected to fail due to concurrent shared + non-shared durable sub attempt");
} catch (JMSException jmse) {
// Expected
}
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
// -------------------------------------- //
@Test(timeout = 20000)
public void testExclusiveDurableSubCanOnlyBeActiveOnceAtATime() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Establish connection
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
// Attach the durable exclusive receiver
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlow();
MessageConsumer subscriber1 = session.createDurableConsumer(dest, subscriptionName);
try {
// Now try to attach a second active durable exclusive receiver
session.createDurableConsumer(dest, subscriptionName);
fail("Expected to fail due to concurrent active subscriber attempt");
} catch (JMSException jmse) {
// Expected
}
testPeer.expectDetach(false, true, false);
subscriber1.close();
// Now try to attach a new active durable exclusive receiver
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlow();
MessageConsumer subscriber2 = session.createDurableConsumer(dest, subscriptionName);
assertNotNull(subscriber2);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
// -------------------------------------- //
/**
* Verifies that a upon failure to locate a subscription link during an
* unsubscribe attempt an [InvalidDestination] exception is thrown.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testUnsubscribeNonExistingSubscription() throws Exception {
doUnsubscribeNonExistingSubscriptionTestImpl(true);
}
/**
* Verifies that a upon failure to locate a subscription link during an
* unsubscribe attempt an [InvalidDestination] exception is thrown, but
* this time using a connection without a ClientID set (and thus
* expecting a different link name to be used).
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testUnsubscribeNonExistingSubscriptionWithoutClientID() throws Exception {
doUnsubscribeNonExistingSubscriptionTestImpl(false);
}
private void doUnsubscribeNonExistingSubscriptionTestImpl(boolean hasClientID) throws JMSException, InterruptedException, Exception, IOException {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection;
if(hasClientID) {
connection = testFixture.establishConnecton(testPeer);
} else {
connection = testFixture.establishConnectonWithoutClientID(testPeer, null);
}
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String subscriptionName = "myInvalidSubscriptionName";
// Try to unsubscribe non-existing subscription
testPeer.expectDurableSubUnsubscribeNullSourceLookup(true, false, subscriptionName, null, hasClientID);
testPeer.expectDetach(true, true, true);
try {
session.unsubscribe(subscriptionName);
fail("Should have thrown a InvalidDestinationException");
} catch (InvalidDestinationException ide) {
// Expected
}
testPeer.waitForAllHandlersToComplete(1000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
/**
* Verifies that a upon attempt to unsubscribe a subscription that is
* currently in active use by a subscriber, an exception is thrown,
* and that once the subscriber is closed a further unsubscribe attempt
* is successfully undertaken.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testUnsubscribeExclusiveDurableSubWhileActiveThenInactive() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
// Attach the durable exclusive receiver
testPeer.expectDurableSubscriberAttach(topicName, subscriptionName);
testPeer.expectLinkFlow();
TopicSubscriber subscriber = session.createDurableSubscriber(dest, subscriptionName);
assertNotNull("TopicSubscriber object was null", subscriber);
// Now try to unsubscribe, should fail
try {
session.unsubscribe(subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
}
// Now close the subscriber
testPeer.expectDetach(false, true, false);
subscriber.close();
// Try to unsubscribe again, should work now
testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
testPeer.expectDetach(true, true, true);
session.unsubscribe(subscriptionName);
testPeer.waitForAllHandlersToComplete(1000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
/**
* Verifies that a upon attempt to unsubscribe a shared durable subscription that is
* currently in active use by multiple subscribers, an exception is thrown, and that
* only once the last subscriber is closed is an unsubscribe attempt successful.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testUnsubscribeSharedDurableSubWhileActiveThenInactive() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
// Attach the first durable shared receiver
Matcher<?> linkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
testPeer.expectLinkFlow();
MessageConsumer subscriber1 = session.createSharedDurableConsumer(dest, subscriptionName);
// Attach the second durable shared receiver
linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true);
testPeer.expectLinkFlow();
MessageConsumer subscriber2 = session.createSharedDurableConsumer(dest, subscriptionName);
testPeer.waitForAllHandlersToComplete(1000);
// Now try to unsubscribe, should fail
try {
session.unsubscribe(subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
// Expected
}
// Now close the first subscriber
testPeer.expectDetach(false, true, false);
subscriber1.close();
testPeer.waitForAllHandlersToComplete(1000);
// Try to unsubscribe again, should still fail
try {
session.unsubscribe(subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
// Expected
}
testPeer.waitForAllHandlersToComplete(1000);
// Now close the second subscriber
testPeer.expectDetach(false, true, false);
subscriber2.close();
// Try to unsubscribe again, should now work
testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
testPeer.expectDetach(true, true, true);
session.unsubscribe(subscriptionName);
testPeer.waitForAllHandlersToComplete(1000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
// -------------------------------------- //
/**
* Verifies that a upon a failed attempt to create a shared subscriber that it
* is possible to unsubscribe (not that it should be required), i.e that the
* failed creation does not leave an incorrect recording of an active subscriber.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testUnsubscribeAfterFailedCreation() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
// Fail to attach a shared durable receiver
Matcher<?> linkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, true);
testPeer.expectDetach(true, false, true);
try {
session.createSharedDurableConsumer(dest, subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
// Expected
}
// Try to unsubscribe, should be able to (strictly speaking an unsub attempt
// would probably fail normally, due to no subscription, but this test
// doesn't care about that, just that the attempt proceeds, so overlook that.
testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
testPeer.expectDetach(true, true, true);
session.unsubscribe(subscriptionName);
testPeer.waitForAllHandlersToComplete(1000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
/**
* Verifies that an unsubscribe attempt detects when the remote indicates that
* the request failed by attaching an error condition to the detach response.
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testUnsubscribeFailsWhenRemoteDetachResponseIndicatesFailure() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
String subscriptionName = "mySubscription";
// Try to unsubscribe, should be able to (strictly speaking an unsub attempt
// would probably fail normally, due to no subscription, but this test
// doesn't care about that, just that the attempt proceeds, so overlook that.
testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
testPeer.expectDetach(true, true, true, AmqpError.RESOURCE_LOCKED,
"Cannot unsubscibe when there are active consumers");
try {
session.unsubscribe(subscriptionName);
fail("Should throw an error if the remote indicate detach failed");
} catch (JMSException jmsEx) {
LOG.info("Caught expected exception on unsubscribe: {}", jmsEx.getMessage());
}
testPeer.waitForAllHandlersToComplete(1000);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
/**
* Verifies that subscriber cleanup occurs when the subscriber is remotely closed (after creation).
*
* @throws Exception if an unexpected error is encountered
*/
@Test(timeout = 20000)
public void testRemotelyDetachLinkWithDurableSharedConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
// Establish connection
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
final CountDownLatch subscriberClosed = new CountDownLatch(1);
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onConsumerClosed(MessageConsumer consumer, Throwable exception) {
subscriberClosed.countDown();
}
});
// Create first session
testPeer.expectBegin();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session1.createTopic(topicName);
String subscriptionName = "mySubscription";
// Attach the first shared receiver on the first session
Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
testPeer.expectLinkFlow();
MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest, subscriptionName);
// Create second session
testPeer.expectBegin();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Attach the second shared receiver on the second session
durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
testPeer.expectLinkFlow();
// Then remotely detach the link after the flow is received.
testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, false, AmqpError.RESOURCE_LIMIT_EXCEEDED, "TestingRemoteDetach");
MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest, subscriptionName);
assertNotNull(subscriber2);
assertTrue("Consumer closed callback didn't trigger", subscriberClosed.await(5, TimeUnit.SECONDS));
testPeer.waitForAllHandlersToComplete(1000);
// Now try to unsubscribe (using first session). It should fail due to sub still
// being in use on the first session. No frames should be sent.
try {
session1.unsubscribe(subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
// Expected
}
// Now close the first subscriber
testPeer.expectDetach(false, true, false);
subscriber1.close();
testPeer.waitForAllHandlersToComplete(1000);
// Try to unsubscribe again (using first session), should now work.
testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
testPeer.expectDetach(true, true, true);
session1.unsubscribe(subscriptionName);
testPeer.expectClose();
connection.close();
}
}
// -------------------------------------- //
/**
* Verifies that a shared durable subscriber creation using the same subscription
* name but different topic than an existing subscriber can't be active on the
* same connection at the same time fails (can't change details in use).
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedDurableSubChangeOfTopic() throws Exception {
doSharedSubChangeOfDetailsTestImpl(true, true, false);
}
/**
* Verifies that a shared volatile subscriber creation using the same subscription
* name but different topic than an existing subscriber can't be active on the
* same connection at the same time fails (can't change details in use).
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedVolatileSubChangeOfTopic() throws Exception {
doSharedSubChangeOfDetailsTestImpl(false, true, false);
}
/**
* Verifies that a shared durable subscriber creation using the same subscription
* name but different selector than an existing subscriber can't be active on the
* same connection at the same time fails (can't change details in use).
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedDurableSubChangeOfSelector() throws Exception {
doSharedSubChangeOfDetailsTestImpl(true, false, true);
}
/**
* Verifies that a shared volatile subscriber creation using the same subscription
* name but different selector than an existing subscriber can't be active on the
* same connection at the same time fails (can't change details in use).
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedVolatileSubChangeOfSelector() throws Exception {
doSharedSubChangeOfDetailsTestImpl(false, false, true);
}
/**
* Verifies that a shared durable subscriber creation using the same subscription
* name but different topic and selector than an existing subscriber can't be active
* on the same connection at the same time fails (can't change details in use).
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedDurableSubChangeOfTopicAndSelector() throws Exception {
doSharedSubChangeOfDetailsTestImpl(true, true, true);
}
/**
* Verifies that a shared volatile subscriber creation using the same subscription
* name but different topic and selector than an existing subscriber can't be active
* on the same connection at the same time fails (can't change details in use).
*
* @throws Exception if an unexpected exception occurs
*/
@Test(timeout = 20000)
public void testSharedVolatileSubChangeOfTopicAndSelector() throws Exception {
doSharedSubChangeOfDetailsTestImpl(false, true, true);
}
private void doSharedSubChangeOfDetailsTestImpl(boolean durable, boolean changeTopic, boolean changeSelector) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
// Establish connection
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName1 = "myTopic";
String topicName2 = topicName1;
if (changeTopic) {
topicName2 += "2";
}
String noSelectorNull = null;
String selector2 = null;
if (changeSelector) {
selector2 = "someProperty = 2";
}
Topic dest1 = session.createTopic(topicName1);
Topic dest2 = session.createTopic(topicName2);
String subscriptionName = "mySubscription";
// Attach the first shared receiver
if (durable) {
Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedDurableSubscriberAttach(topicName1, subscriptionName, durableLinkNameMatcher, true);
} else {
Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
testPeer.expectSharedVolatileSubscriberAttach(topicName1, subscriptionName, volatileLinkNameMatcher, true);
}
testPeer.expectLinkFlow();
MessageConsumer subscriber1;
if (durable) {
subscriber1 = session.createSharedDurableConsumer(dest1, subscriptionName, noSelectorNull);
} else {
subscriber1 = session.createSharedConsumer(dest1, subscriptionName, noSelectorNull);
}
testPeer.waitForAllHandlersToComplete(1000);
// Now try to create another subscriber, same name but different topic. Should fail. No frames should be sent.
try {
if (durable) {
session.createSharedDurableConsumer(dest2, subscriptionName, selector2);
} else {
session.createSharedConsumer(dest2, subscriptionName, selector2);
}
fail("Expected to fail due to attempting change of subscription details while subscriber is active");
} catch (JMSException jmse) {
// Expected
}
// Now close the first subscriber
if (durable) {
testPeer.expectDetach(false, true, false);
} else {
testPeer.expectDetach(true, true, true);
}
subscriber1.close();
// Now try a new subscriber again, with changed topic, it should succeed (note also the verified reuse of link names).
if (durable) {
Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedDurableSubscriberAttach(topicName2, subscriptionName, durableLinkNameMatcher, true);
} else {
Matcher<?> volatileLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "volatile1");
testPeer.expectSharedVolatileSubscriberAttach(topicName2, subscriptionName, volatileLinkNameMatcher, true);
}
testPeer.expectLinkFlow();
MessageConsumer subscriber2;
if (durable) {
subscriber2 = session.createSharedDurableConsumer(dest2, subscriptionName, selector2);
} else {
subscriber2 = session.createSharedConsumer(dest2, subscriptionName, selector2);
}
assertNotNull(subscriber2);
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
// -------------------------------------- //
/**
* Verifies that subscriber cleanup occurs when the session it is on is remotely closed.
*
* @throws Exception if an unexpected error is encountered
*/
@Test(timeout = 20000)
public void testRemotelyEndSessionWithDurableSharedConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
// Establish connection
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
final CountDownLatch sessionClosed = new CountDownLatch(1);
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onSessionClosed(Session session, Throwable exception) {
sessionClosed.countDown();
}
});
// Create first session
testPeer.expectBegin();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session1.createTopic(topicName);
String subscriptionName = "mySubscription";
// Attach the first shared receiver on the first session
Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
testPeer.expectLinkFlow();
MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest, subscriptionName);
// Create second session
testPeer.expectBegin();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Attach the second shared receiver on the second session
durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
testPeer.expectLinkFlow();
// Then remotely end the session (and thus the subscriber along with it) after the flow is received.
testPeer.remotelyEndLastOpenedSession(true, 0, AmqpError.RESOURCE_LIMIT_EXCEEDED, "TestingRemoteClosure");
MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest, subscriptionName);
assertNotNull(subscriber2);
assertTrue("Session closed callback didn't trigger", sessionClosed.await(5, TimeUnit.SECONDS));
testPeer.waitForAllHandlersToComplete(1000);
// Now try to unsubscribe (using first session, still open). It should fail due to sub still
// being in use on the first session. No frames should be sent.
try {
session1.unsubscribe(subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
// Expected
}
// Now close the first subscriber
testPeer.expectDetach(false, true, false);
subscriber1.close();
testPeer.waitForAllHandlersToComplete(1000);
// Try to unsubscribe again (using first session, still open), should now work.
testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
testPeer.expectDetach(true, true, true);
session1.unsubscribe(subscriptionName);
testPeer.expectClose();
connection.close();
}
}
/**
* Verifies that subscriber cleanup occurs when the session it is on is locally closed.
*
* @throws Exception if an unexpected error is encountered
*/
@Test(timeout = 20000)
public void testLocallyEndSessionWithSharedConsumer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
// Establish connection
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
final CountDownLatch sessionClosed = new CountDownLatch(1);
((JmsConnection) connection).addConnectionListener(new JmsDefaultConnectionListener() {
@Override
public void onSessionClosed(Session session, Throwable exception) {
sessionClosed.countDown();
}
});
// Create first session
testPeer.expectBegin();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session1.createTopic(topicName);
String subscriptionName = "mySubscription";
// Attach the first shared receiver on the first session
Matcher<?> durableLinkNameMatcher = equalTo(subscriptionName);
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
testPeer.expectLinkFlow();
MessageConsumer subscriber1 = session1.createSharedDurableConsumer(dest, subscriptionName);
// Create second session
testPeer.expectBegin();
Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Attach the second shared receiver on the second session
durableLinkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "2");
testPeer.expectSharedDurableSubscriberAttach(topicName, subscriptionName, durableLinkNameMatcher, true);
testPeer.expectLinkFlow();
MessageConsumer subscriber2 = session2.createSharedDurableConsumer(dest, subscriptionName);
assertNotNull(subscriber2);
// Now close the second session (and thus the subscriber along with it).
testPeer.expectEnd();
session2.close();
// Now try to unsubscribe (using first session, still open). It should fail due to sub still
// being in use on the first session. No frames should be sent.
try {
session1.unsubscribe(subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
// Expected
}
// Now close the first subscriber
testPeer.expectDetach(false, true, false);
subscriber1.close();
testPeer.waitForAllHandlersToComplete(1000);
// Try to unsubscribe again (using first session, still open), should now work.
testPeer.expectDurableSubUnsubscribeNullSourceLookup(false, false, subscriptionName, topicName, true);
testPeer.expectDetach(true, true, true);
session1.unsubscribe(subscriptionName);
testPeer.expectClose();
connection.close();
}
}
// -------------------------------------- //
/**
* Verifies that subscription name passed is not allowed to have the subscription name
* delimiter used in the receiver link naming to separate the subscription name from
* a suffix used to ensure unique link names are used on a connection.
*
* @throws Exception if an unexpected error is encountered
*/
@Test(timeout = 20000)
public void testSubscriptionNameNotAllowedToHaveNameSeparator() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
// Establish connection
Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities);
// Create session
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Try the various methods that take subscription name with a name that
// contains the delimiter, they should fail. No frames should be sent.
String subscriptionName = "thisName|hasTheDelimiterAlready";
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
try {
session.createDurableSubscriber(dest, subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
// Expected
}
try {
session.createDurableConsumer(dest, subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
// Expected
}
try {
session.createSharedDurableConsumer(dest, subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
// Expected
}
try {
session.createSharedConsumer(dest, subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
// Expected
}
try {
session.unsubscribe(subscriptionName);
fail("Should have thrown a JMSException");
} catch (JMSException ex) {
// Expected
}
testPeer.waitForAllHandlersToComplete(1000);
// Now close connection, should work.
testPeer.expectClose();
connection.close();
}
}
// -------------------------------------- //
@Test(timeout = 20000)
public void testSharedTopicSubscriberBehavesLikeNoClientIDWasSetWhenAwaitClientIdOptionIsFalse() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
int serverPort = testPeer.getServerPort();
// Add server connection capability to indicate support for shared-subs
Symbol[] serverCapabilities = new Symbol[]{SHARED_SUBS};
testPeer.expectSaslAnonymous();
testPeer.expectOpen(null, serverCapabilities);
testPeer.expectBegin();
ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + serverPort + "?jms.awaitClientID=false");
Connection connection = factory.createConnection();
// Verify that all handlers complete, i.e. the awaitClientID=false option
// setting was effective in provoking the AMQP Open immediately even
// though it has no ClientID and we haven't used the Connection.
testPeer.waitForAllHandlersToComplete(3000);
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String topicName = "myTopic";
Topic dest = session.createTopic(topicName);
String subscriptionName = "mySubscription";
// Expect a shared subscription with 'global' capability and link name qualifier,
// since this connection does not have a ClientID set.
Matcher<?> linkNameMatcher = equalTo(subscriptionName + SUB_NAME_DELIMITER + "global-volatile1");
testPeer.expectSharedVolatileSubscriberAttach(topicName, subscriptionName, linkNameMatcher, false);
testPeer.expectLinkFlow();
MessageConsumer subscriber = session.createSharedConsumer(dest, subscriptionName);
testPeer.expectDetach(true, true, true);
subscriber.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(1000);
}
}
}