blob: 2b03b08b24ac674a0c50cf042b66c45dcf366b32 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Test DeapQueueConsumerWithSelector
* Summary:
* Prior to M4 the broker had a different queue model which pre-processed the
* messages on the queue for any connecting subscription that had a selector.
*
* If the queue had a lot of data then this may take a long time to process
* to such an extent that the subscription creation may time out. During this
* pre-process phase the virtualhost would be come unresposive.
*
* Our solution was to allow the timeout to be adjusted QPID-1119, which allowed
* the subscription to connect but did not address the unresponsiveness.
*
* The new queue model introduced in M4 resolved this.
*
* This test is to validate that the new queueing model does indeed remove the
* long pre-processing phase and allow immediate subscription so that there is
* no unresponsive period.
*
* Test Strategy:
*
* Add 100k messages to the queue with a numberic header property that will
* allow later subscribers to use as in a selector.
*
* Connect the subscriber and time how long it takes to connect.
*
* Finally consume all the messages from the queue to clean up.
*/
public class DeepQueueConsumeWithSelector extends QpidBrokerTestCase implements MessageListener
{
private static final int MESSAGE_COUNT = 10000;
private static final int BATCH_SIZE = MESSAGE_COUNT / 10;
private CountDownLatch _receviedLatch = new CountDownLatch(MESSAGE_COUNT);
protected long SYNC_WRITE_TIMEOUT = 120000L;
@Override
public void setUp() throws Exception
{
//Set the syncWrite timeout to be just larger than the delay on the commitTran.
setSystemProperty("amqj.default_syncwrite_timeout", String.valueOf(SYNC_WRITE_TIMEOUT));
super.setUp();
}
public void test() throws Exception
{
// Create Connection
Connection connection = getConnection();
Session session = ((AMQConnection)connection).createSession(true, Session.SESSION_TRANSACTED, 100000);
Queue queue = createTestQueue(session);
// Send Messages
sendMessage(session, queue, MESSAGE_COUNT, BATCH_SIZE);
session.close();
session = ((AMQConnection) connection).createSession(false, Session.AUTO_ACKNOWLEDGE);//, 100000);
// Setup Selector to perform a few calculations which will slow it down
String selector = "((\"" + INDEX + "\" % 1) = 0) AND ('" + INDEX + "' IS NOT NULL) AND ('" + INDEX + "' <> -1)";
// Setup timing
long start = System.nanoTime();
System.err.println("Create Consumer");
// Connect Consumer
MessageConsumer consumer = session.createConsumer(queue, selector);
consumer.setMessageListener(this);
// Validate timing details
long end = System.nanoTime();
System.err.println("Subscription time took:" + (end - start));
// Consume Messages
connection.start();
assertTrue("Messages took to long to be received :"+_receviedLatch.getCount(),
_receviedLatch.await(SYNC_WRITE_TIMEOUT, TimeUnit.MILLISECONDS ));
}
@Override
public Message createNextMessage(Session session, int msgCount) throws JMSException
{
Message message = super.createNextMessage(session,msgCount);
if ((msgCount % BATCH_SIZE) == 0 )
{
System.err.println("Sent:"+msgCount);
}
return message;
}
@Override
public void onMessage(Message message)
{
_receviedLatch.countDown();
int msgCount = 0;
try
{
msgCount = message.getIntProperty(INDEX);
}
catch (JMSException e)
{
//ignore
}
if ((msgCount % BATCH_SIZE) == 0 )
{
System.err.println("Received:"+msgCount);
}
}
}