blob: 24e133b9572cae11567a70d8814009f09c3fe2d0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.jms.integration;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.junit.Test;
public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
@Test(timeout=20000)
public void testZeroPrefetchConsumerReceiveWithMessageExpiredInFlight() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Create a connection with zero prefetch
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the consumer to attach but NOT send credit
testPeer.expectReceiverAttach();
final MessageConsumer consumer = session.createConsumer(queue);
// Expect that once receive is called, it flows a credit, give it an already-expired message.
// Expect it to be filtered due to local expiration checking.
PropertiesDescribedType props = new PropertiesDescribedType();
props.setAbsoluteExpiryTime(new Date(System.currentTimeMillis() - 100));
testPeer.expectLinkFlowRespondWithTransfer(null, null, props, null, new AmqpValueDescribedType("already-expired"));
ModifiedMatcher modifiedMatcher = new ModifiedMatcher();
modifiedMatcher.withDeliveryFailed(equalTo(true));
modifiedMatcher.withUndeliverableHere(equalTo(true));
testPeer.expectDisposition(true, modifiedMatcher, 1, 1);
// Expect the client to then flow another credit requesting a message.
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.valueOf(1)));
// Send it a live message, expect it to get accepted.
String liveMsgContent = "valid";
testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(liveMsgContent), 2);
testPeer.expectDisposition(true, new AcceptedMatcher(), 2, 2);
Message m = consumer.receive(5000);
assertNotNull("Message should have been received", m);
assertTrue(m instanceof TextMessage);
assertEquals("Unexpected message content", liveMsgContent, ((TextMessage) m).getText());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testZeroPrefetchConsumerReceiveNoWaitDrainsWithOneCredit() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Create a connection with zero prefetch
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the consumer to attach but NOT send credit
testPeer.expectReceiverAttach();
final MessageConsumer consumer = session.createConsumer(queue);
String msgContent = "content";
// Expect that once receiveNoWait is called, it drains with 1 credit, then give it a message.
testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.ONE));
testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(msgContent), 1);
// Expect it to be accepted.
testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
Message m = consumer.receiveNoWait();
assertNotNull("Message should have been received", m);
assertTrue(m instanceof TextMessage);
assertEquals("Unexpected message content", msgContent, ((TextMessage) m).getText());
testPeer.expectClose();
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
@Test(timeout=20000)
public void testZeroPrefetchMessageListener() throws Exception {
final CountDownLatch msgReceived = new CountDownLatch(1);
final CountDownLatch completeOnMessage = new CountDownLatch(1);
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Create a connection with zero prefetch
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getTestName());
// Expected the consumer to attach but NOT send credit
testPeer.expectReceiverAttach();
MessageConsumer consumer = session.createConsumer(destination);
testPeer.waitForAllHandlersToComplete(2000);
MessageListener listener = new MessageListener() {
@Override
public void onMessage(Message message) {
msgReceived.countDown();
try {
completeOnMessage.await(6, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
// Expect that once setMessageListener is called, it flows 1 credit with drain=false. Then give it a message.
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.ONE));
testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType("content"), 1);
consumer.setMessageListener(listener);
// Wait for message to arrive
assertTrue("message not received in given time", msgReceived.await(6, TimeUnit.SECONDS));
// Ensure the handlers are complete at the peer
testPeer.waitForAllHandlersToComplete(2000);
// Now allow onMessage to complete, expecting an accept and another flow.
testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
completeOnMessage.countDown();
// Wait for the resulting flow to be received
testPeer.waitForAllHandlersToComplete(2000);
testPeer.expectClose();
connection.close();
}
}
@Test(timeout=40000)
public void testZeroPrefetchConsumerReceiveUnblockedOnSessionClose() throws Exception {
doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(0);
}
@Test(timeout=40000)
public void testZeroPrefetchConsumerReceiveTimedUnblockedOnSessionClose() throws Exception {
doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(1);
}
@Test(timeout=40000)
public void testZeroPrefetchConsumerReceiveNoWaitUnblockedOnSessionClose() throws Exception {
doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(-1);
}
public void doTestZeroPrefetchConsumerReceiveUnblockedOnSessionClose(final int timeout) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
// Create a connection with zero prefetch
Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
connection.start();
testPeer.expectBegin();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("myQueue");
// Expected the consumer to attach but NOT send credit
testPeer.expectReceiverAttach();
final MessageConsumer consumer = session.createConsumer(queue);
// Expect that once receive is called, it drains with 1 credit, don't answer it
if (timeout < 0) {
testPeer.expectLinkFlow(true, false, equalTo(UnsignedInteger.ONE));
} else {
testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.ONE));
}
final AtomicBoolean error = new AtomicBoolean(false);
final CountDownLatch done = new CountDownLatch(1);
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(new Runnable() {
@Override
public void run() {
try {
if (timeout < 0) {
consumer.receiveNoWait();
} else if (timeout == 0) {
consumer.receive();
} else {
consumer.receive(10000);
}
} catch (Exception ex) {
error.set(true);
} finally {
done.countDown();
}
}
});
testPeer.waitForAllHandlersToComplete(3000);
testPeer.expectEnd();
testPeer.expectClose();
session.close();
assertTrue("Consumer did not unblock", done.await(10, TimeUnit.SECONDS));
connection.close();
testPeer.waitForAllHandlersToComplete(3000);
}
}
}