| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.jms.integration; |
| |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.util.Date; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import javax.jms.Connection; |
| import javax.jms.Message; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageListener; |
| import javax.jms.Queue; |
| import javax.jms.Session; |
| import javax.jms.TextMessage; |
| |
| import org.apache.qpid.jms.test.QpidJmsTestCase; |
| import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType; |
| import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher; |
| import org.apache.qpid.proton.amqp.UnsignedInteger; |
| import org.junit.Test; |
| |
| public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase { |
| private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); |
| |
| @Test(timeout=20000) |
| public void testZeroPrefetchConsumerReceiveWithMessageExpiredInFlight() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| // Create a connection with zero prefetch |
| Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0"); |
| connection.start(); |
| |
| testPeer.expectBegin(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| // Expected the consumer to attach but NOT send credit |
| testPeer.expectReceiverAttach(); |
| |
| final MessageConsumer consumer = session.createConsumer(queue); |
| |
| // Expect that once receive is called, it flows a credit, give it an already-expired message. |
| // Expect it to be filtered due to local expiration checking. |
| PropertiesDescribedType props = new PropertiesDescribedType(); |
| props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100)); |
| testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType("already-expired")); |
| |
| ModifiedMatcher modifiedMatcher = new ModifiedMatcher(); |
| modifiedMatcher.withDeliveryFailed(equalTo(true)); |
| modifiedMatcher.withUndeliverableHere(equalTo(true)); |
| |
| testPeer.expectDisposition(true, modifiedMatcher, 1, 1); |
| |
| // Expect the client to then flow another credit requesting a message. |
| testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1))); |
| |
| // Send it a live message, expect it to get accepted. |
| String liveMsgContent = "valid"; |
| testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2); |
| testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2); |
| |
| Message m = consumer.receive(5000); |
| assertNotNull("Message should have been received", m); |
| assertTrue(m instanceof TextMessage); |
| assertEquals("Unexpected message content", liveMsgContent, ((TextMessage) m).getText()); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| } |
| } |
| |
| @Test(timeout=20000) |
| public void testZeroPrefetchConsumerReceiveNoWaitDrainsWithOneCredit() throws Exception { |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| // Create a connection with zero prefetch |
| Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0"); |
| connection.start(); |
| |
| testPeer.expectBegin(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| // Expected the consumer to attach but NOT send credit |
| testPeer.expectReceiverAttach(); |
| |
| final MessageConsumer consumer = session.createConsumer(queue); |
| |
| String msgContent = "content"; |
| // Expect that once receiveNoWait is called, it drains with 1 credit, then give it a message. |
| testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.ONE)); |
| testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(msgContent), 1); |
| |
| // Expect it to be accepted. |
| testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1); |
| |
| Message m = consumer.receiveNoWait(); |
| assertNotNull("Message should have been received", m); |
| assertTrue(m instanceof TextMessage); |
| assertEquals("Unexpected message content", msgContent, ((TextMessage) m).getText()); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| } |
| } |
| |
| @Test(timeout=20000) |
| public void testZeroPrefetchMessageListener() throws Exception { |
| final CountDownLatch msgReceived = new CountDownLatch(1); |
| final CountDownLatch completeOnMessage = new CountDownLatch(1); |
| |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| // Create a connection with zero prefetch |
| Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0"); |
| connection.start(); |
| |
| testPeer.expectBegin(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue destination = session.createQueue(getTestName()); |
| |
| // Expected the consumer to attach but NOT send credit |
| testPeer.expectReceiverAttach(); |
| |
| MessageConsumer consumer = session.createConsumer(destination); |
| |
| testPeer.waitForAllHandlersToComplete(2000); |
| |
| MessageListener listener = new MessageListener() { |
| @Override |
| public void onMessage(Message message) { |
| msgReceived.countDown(); |
| |
| try { |
| completeOnMessage.await(6, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| }; |
| |
| // Expect that once setMessageListener is called, it flows 1 credit with drain=false. Then give it a message. |
| testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.ONE)); |
| testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType("content"), 1); |
| |
| consumer.setMessageListener(listener); |
| |
| // Wait for message to arrive |
| assertTrue("message not received in given time", msgReceived.await(6, TimeUnit.SECONDS)); |
| |
| // Ensure the handlers are complete at the peer |
| testPeer.waitForAllHandlersToComplete(2000); |
| |
| // Now allow onMessage to complete, expecting an accept and another flow. |
| testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1); |
| testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE)); |
| completeOnMessage.countDown(); |
| |
| // Wait for the resulting flow to be received |
| testPeer.waitForAllHandlersToComplete(2000); |
| |
| testPeer.expectClose(); |
| connection.close(); |
| } |
| } |
| |
| @Test(timeout=40000) |
| public void testZeroPrefetchConsumerReceiveUnblockedOnSessionClose() throws Exception { |
| doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(0); |
| } |
| |
| @Test(timeout=40000) |
| public void testZeroPrefetchConsumerReceiveTimedUnblockedOnSessionClose() throws Exception { |
| doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(1); |
| } |
| |
| @Test(timeout=40000) |
| public void testZeroPrefetchConsumerReceiveNoWaitUnblockedOnSessionClose() throws Exception { |
| doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(-1); |
| } |
| |
| public void doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(final int timeout) throws Exception { |
| |
| try (TestAmqpPeer testPeer = new TestAmqpPeer();) { |
| // Create a connection with zero prefetch |
| Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0"); |
| connection.start(); |
| |
| testPeer.expectBegin(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = session.createQueue("myQueue"); |
| |
| // Expected the consumer to attach but NOT send credit |
| testPeer.expectReceiverAttach(); |
| |
| final MessageConsumer consumer = session.createConsumer(queue); |
| |
| // Expect that once receive is called, it drains with 1 credit, don't answer it |
| if (timeout < 0) { |
| testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.ONE)); |
| } else { |
| testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.ONE)); |
| } |
| |
| final AtomicBoolean error = new AtomicBoolean(false); |
| final CountDownLatch done = new CountDownLatch(1); |
| |
| ExecutorService executor = Executors.newSingleThreadExecutor(); |
| executor.execute(new Runnable() { |
| |
| @Override |
| public void run() { |
| try { |
| if (timeout < 0) { |
| consumer.receiveNoWait(); |
| } else if (timeout == 0) { |
| consumer.receive(); |
| } else { |
| consumer.receive(10000); |
| } |
| } catch (Exception ex) { |
| error.set(true); |
| } finally { |
| done.countDown(); |
| } |
| } |
| }); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| testPeer.expectEnd(); |
| testPeer.expectClose(); |
| |
| session.close(); |
| |
| assertTrue("Consumer did not unblock", done.await(10, TimeUnit.SECONDS)); |
| |
| connection.close(); |
| |
| testPeer.waitForAllHandlersToComplete(3000); |
| } |
| } |
| } |