blob: 3dc1b539034e338c3ddd93009e2239b65a8734f4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Enumeration;
import java.util.NoSuchElementException;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.hamcrest.Matchers;
import org.junit.Test;
public class QueueBrowserIntegrationTest extends QpidJmsTestCase {
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
//----- Test basic create and destroy mechanisms -------------------------//
@Test(timeout=30000)
public void testCreateQueueBrowserWithoutEnumeration() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectEnd();
testPeer.expectClose();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Creating the browser should send nothing until an Enumeration is created.
QueueBrowser browser = session.createBrowser(queue);
browser.close();
session.close();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testCreateQueueBrowserAndEnumeration() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the browser to create a consumer and send credit.
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
//----- Tests for expected behaviors of a QueueBrowser implementation ----//
@Test(timeout=30000)
public void testQueueBrowserNextElementWithNoMessage() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the browser to create a consumer and send credit, then drain it when
// no message is there to satisfy an internal hasMoreElements check, then send more
// credit to reopen a window.
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
try {
queueView.nextElement();
fail("Should have thrown an exception due to there being no more elements");
} catch (NoSuchElementException nsee) {
// expected
}
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testQueueBrowserPrefetchOne() throws IOException, Exception {
final DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=1");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the browser to create a consumer and send credit, then drain it when
// no message arrives before hasMoreElements is called, at which point we send one.
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
1, true, true, Matchers.equalTo(UnsignedInteger.ONE), 1, true, false);
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertTrue(queueView.hasMoreElements());
assertNotNull(queueView.nextElement());
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
//----- Tests that cover QueueBrowser and Session Ack mode interaction ---//
@Test(timeout=30000)
public void testCreateQueueBrowserAutoAckSession() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
// Expected the browser to create a consumer and send credit
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
// Then expect it to drain it when no message arrives before hasMoreElements is called,
// at which point we send one, and a response flow to indicate the rest of the credit was drained.
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false);
// Expect the credit window to be opened again, but accounting for the message we just prefetched.
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH - 1)));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertTrue(queueView.hasMoreElements());
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testCreateQueueBrowserClientAckSession() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
// Expected the browser to create a consumer and send credit
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
// Then expect it to drain it when no message arrives before hasMoreElements is called,
// at which point we send one, and a response flow to indicate the rest of the credit was drained.
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false);
// Expect the credit window to be opened again, but accounting for the message we just prefetched.
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH - 1)));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertTrue(queueView.hasMoreElements());
browser.close();
testPeer.expectEnd();
session.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testCreateQueueBrowserTransactedSession() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
connection.start();
testPeer.expectBegin();
testPeer.expectCoordinatorAttach();
// First expect an unsettled 'declare' transfer to the txn coordinator, and
// reply with a declared disposition state containing the txnId.
Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
testPeer.expectTransfer(declareMatcher, nullValue(), new Declared().setTxnId(txnId), true);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue("myQueue");
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
// Expect the browser enumeration to create a underlying consumer
testPeer.expectQueueBrowserAttach();
// Expect initial credit to be sent
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)));
// Then expect it to drain it when no message arrives before hasMoreElements is called,
// at which point we send one, and a response flow to indicate the rest of the credit was drained.
testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent,
1, true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH)), 1, true, false);
// Expect a non-draining flow to reopen the credit window again afterwards, but accounting for the message we just prefetched.
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_BROWSER_PREFETCH - 1)));
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertTrue(queueView.hasMoreElements());
// Browser should close without delay as it does not participate in the TX
testPeer.expectDetach(true, true, true);
browser.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
//----- Tests that cover QueueBrowser when prefetch is zero --------------//
@Test(timeout=30000)
public void testCreateQueueBrowserAndEnumerationZeroPrefetch() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the browser to create a consumer and NOT send credit.
testPeer.expectQueueBrowserAttach();
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testQueueBrowserHasMoreElementsZeroPrefetchNoMessage() throws IOException, Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the browser to create a consumer, then drain with 1 credit.
testPeer.expectQueueBrowserAttach();
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.ONE));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertFalse(queueView.hasMoreElements());
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=30000)
public void testQueueBrowserHasMoreElementsZeroPrefetchDrainedMessage() throws IOException, Exception {
DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the browser to create a consumer.
testPeer.expectQueueBrowserAttach();
// When hasMoreElements is called, the browser should drain with 1 credit,
// at which point we send it a message to use all the credit.
testPeer.expectLinkFlowRespondWithTransfer(
null, null, null, null, amqpValueNullContent, 1, true,
false, equalTo(UnsignedInteger.ONE), 1, true, false);
// Next attempt should not get a message, we just send a response flow indicating
// the credit was cleared, triggering a false on hasMoreElemets.
testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.ONE));
testPeer.expectDetach(true, true, true);
QueueBrowser browser = session.createBrowser(queue);
Enumeration<?> queueView = browser.getEnumeration();
assertNotNull(queueView);
assertTrue(queueView.hasMoreElements());
Message message = (Message) queueView.nextElement();
assertNotNull(message);
assertFalse(queueView.hasMoreElements());
browser.close();
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
}