blob: fc772185cf1987614cf614b3dd42ec101d7029b7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.IdGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class LargeMessageTestSupport extends ClientTestSupport implements MessageListener {
protected static final int LARGE_MESSAGE_SIZE = 128 * 1024;
protected static final int MESSAGE_COUNT = 100;
private static final Logger LOG = LoggerFactory.getLogger(LargeMessageTestSupport.class);
protected Connection producerConnection;
protected Connection consumerConnection;
protected MessageConsumer consumer;
protected MessageProducer producer;
protected Session producerSession;
protected Session consumerSession;
protected byte[] largeMessageData;
protected Destination destination;
protected boolean isTopic = true;
protected boolean isDurable = true;
protected int deliveryMode = DeliveryMode.PERSISTENT;
protected IdGenerator idGen = new IdGenerator();
protected boolean validMessageConsumption = true;
protected AtomicInteger messageCount = new AtomicInteger(0);
protected int prefetchValue = 10000000;
protected Destination createDestination() {
String subject = getClass().getName();
if (isTopic) {
return new ActiveMQTopic(subject);
} else {
return new ActiveMQQueue(subject);
}
}
protected MessageConsumer createConsumer() throws JMSException {
if (isTopic && isDurable) {
return consumerSession.createDurableSubscriber((Topic)destination, idGen.generateId());
} else {
return consumerSession.createConsumer(destination);
}
}
public void setUp() throws Exception {
super.setUp();
ClientTestSupport.removeMessageStore();
LOG.info("Setting up . . . . . ");
messageCount.set(0);
destination = createDestination();
largeMessageData = new byte[LARGE_MESSAGE_SIZE];
for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
if (i % 2 == 0) {
largeMessageData[i] = 'a';
} else {
largeMessageData[i] = 'z';
}
}
try {
// allow the broker to start
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new JMSException(e.getMessage());
}
ActiveMQConnectionFactory fac = getConnectionFactory();
producerConnection = fac.createConnection();
setPrefetchPolicy((ActiveMQConnection)producerConnection);
producerConnection.start();
consumerConnection = fac.createConnection();
setPrefetchPolicy((ActiveMQConnection)consumerConnection);
consumerConnection.setClientID(idGen.generateId());
consumerConnection.start();
producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = producerSession.createProducer(createDestination());
producer.setDeliveryMode(deliveryMode);
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = createConsumer();
consumer.setMessageListener(this);
LOG.info("Setup complete");
}
protected void setPrefetchPolicy(ActiveMQConnection activeMQConnection) {
activeMQConnection.getPrefetchPolicy().setTopicPrefetch(prefetchValue);
activeMQConnection.getPrefetchPolicy().setQueuePrefetch(prefetchValue);
activeMQConnection.getPrefetchPolicy().setDurableTopicPrefetch(prefetchValue);
activeMQConnection.getPrefetchPolicy().setQueueBrowserPrefetch(prefetchValue);
activeMQConnection.getPrefetchPolicy().setOptimizeDurableTopicPrefetch(prefetchValue);
}
public void tearDown() throws Exception {
Thread.sleep(1000);
producerConnection.close();
consumerConnection.close();
super.tearDown();
largeMessageData = null;
}
protected boolean isSame(BytesMessage msg1) throws Exception {
boolean result = false;
((ActiveMQMessage)msg1).setReadOnlyBody(true);
for (int i = 0; i < LARGE_MESSAGE_SIZE; i++) {
result = msg1.readByte() == largeMessageData[i];
if (!result) {
break;
}
}
return result;
}
public void onMessage(Message msg) {
try {
BytesMessage ba = (BytesMessage)msg;
validMessageConsumption &= isSame(ba);
assertTrue(ba.getBodyLength() == LARGE_MESSAGE_SIZE);
if (messageCount.incrementAndGet() >= MESSAGE_COUNT) {
synchronized (messageCount) {
messageCount.notify();
}
}
LOG.info("got message = " + messageCount);
if (messageCount.get() % 50 == 0) {
LOG.info("count = " + messageCount);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void testLargeMessages() throws Exception {
for (int i = 0; i < MESSAGE_COUNT; i++) {
LOG.info("Sending message: " + i);
BytesMessage msg = producerSession.createBytesMessage();
msg.writeBytes(largeMessageData);
producer.send(msg);
}
long now = System.currentTimeMillis();
while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) {
LOG.info("message count = " + messageCount);
synchronized (messageCount) {
messageCount.wait(1000);
}
}
LOG.info("Finished count = " + messageCount);
assertTrue("Not enough messages - expected " + MESSAGE_COUNT + " but got " + messageCount, messageCount.get() == MESSAGE_COUNT);
assertTrue("received messages are not valid", validMessageConsumption);
Thread.sleep(1000);
LOG.info("FINAL count = " + messageCount);
}
}