blob: a633695bcd35a22bbebc098f835442a1756c8035 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import java.util.Properties;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JMSMemtest {
private static final Logger LOG = LoggerFactory.getLogger(JMSMemtest.class);
private static final int DEFAULT_MESSAGECOUNT = 5000;
protected BrokerService broker;
protected boolean topic = true;
protected boolean durable;
protected long messageCount;
// how large the message in kb before we close/start the producer/consumer with a new connection. -1 means no connectionCheckpointSize
protected int connectionCheckpointSize;
protected long connectionInterval;
protected int consumerCount;
protected int producerCount;
protected int checkpointInterval;
protected int prefetchSize;
//set 10 kb of payload as default
protected int messageSize;
protected String reportDirectory;
protected String reportName;
protected String url = "";
protected MemProducer[] producers;
protected MemConsumer[] consumers;
protected String destinationName;
protected boolean allMessagesConsumed = true;
protected MemConsumer allMessagesList = new MemConsumer();
protected Message payload;
protected ActiveMQConnectionFactory connectionFactory;
protected Connection connection;
protected Destination destination;
protected boolean createConnectionPerClient = true;
protected boolean transacted;
protected boolean useEmbeddedBroker = true;
protected MemoryMonitoringTool memoryMonitoringTool;
public JMSMemtest(Properties settings) {
url = settings.getProperty("url");
topic = new Boolean(settings.getProperty("topic")).booleanValue();
durable = new Boolean(settings.getProperty("durable")).booleanValue();
connectionCheckpointSize = new Integer(settings.getProperty("connectionCheckpointSize")).intValue();
producerCount = new Integer(settings.getProperty("producerCount")).intValue();
consumerCount = new Integer(settings.getProperty("consumerCount")).intValue();
messageCount = new Integer(settings.getProperty("messageCount")).intValue();
messageSize = new Integer(settings.getProperty("messageSize")).intValue();
prefetchSize = new Integer(settings.getProperty("prefetchSize")).intValue();
checkpointInterval = new Integer(settings.getProperty("checkpointInterval")).intValue() * 1000;
producerCount = new Integer(settings.getProperty("producerCount")).intValue();
reportName = settings.getProperty("reportName");
destinationName = settings.getProperty("destinationName");
reportDirectory = settings.getProperty("reportDirectory");
connectionInterval = connectionCheckpointSize * 1024;
}
public static void main(String[] args) {
Properties sysSettings = new Properties();
for (int i = 0; i < args.length; i++) {
int index = args[i].indexOf("=");
String key = args[i].substring(0, index);
String val = args[i].substring(index + 1);
sysSettings.setProperty(key, val);
}
JMSMemtest memtest = new JMSMemtest(sysSettings);
try {
memtest.start();
} catch (Exception e) {
e.printStackTrace();
}
}
protected void start() throws Exception {
LOG.info("Starting Monitor");
memoryMonitoringTool = new MemoryMonitoringTool();
memoryMonitoringTool.setTestSettings(getSysTestSettings());
Thread monitorThread = memoryMonitoringTool.startMonitor();
if (messageCount == 0) {
messageCount = DEFAULT_MESSAGECOUNT;
}
if (useEmbeddedBroker) {
if (broker == null) {
broker = createBroker();
}
}
connectionFactory = (ActiveMQConnectionFactory) createConnectionFactory();
if (prefetchSize > 0) {
connectionFactory.getPrefetchPolicy().setTopicPrefetch(prefetchSize);
connectionFactory.getPrefetchPolicy().setQueuePrefetch(prefetchSize);
}
connection = connectionFactory.createConnection();
Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE);
if (topic) {
destination = session.createTopic(destinationName);
} else {
destination = session.createQueue(destinationName);
}
createPayload(session);
publishAndConsume();
LOG.info("Closing resources");
this.close();
monitorThread.join();
}
protected boolean resetConnection(int counter) {
if (connectionInterval > 0) {
long totalMsgSizeConsumed = counter * 1024;
if (connectionInterval < totalMsgSizeConsumed) {
return true;
}
}
return false;
}
protected void publishAndConsume() throws Exception {
createConsumers();
createProducers();
int counter = 0;
boolean resetCon = false;
LOG.info("Start sending messages ");
for (int i = 0; i < messageCount; i++) {
if (resetCon) {
closeConsumers();
closeProducers();
createConsumers();
createProducers();
resetCon = false;
}
for (int k = 0; k < producers.length; k++) {
producers[k].sendMessage(payload, "counter", counter);
counter++;
if (resetConnection(counter)) {
resetCon = true;
break;
}
}
}
}
protected void close() throws Exception {
connection.close();
broker.stop();
memoryMonitoringTool.stopMonitor();
}
protected void createPayload(Session session) throws JMSException {
byte[] array = new byte[messageSize];
for (int i = 0; i < array.length; i++) {
array[i] = (byte) i;
}
BytesMessage bystePayload = session.createBytesMessage();
bystePayload.writeBytes(array);
payload = (Message) bystePayload;
}
protected void createProducers() throws JMSException {
producers = new MemProducer[producerCount];
for (int i = 0; i < producerCount; i++) {
producers[i] = new MemProducer(connectionFactory, destination);
if (durable) {
producers[i].setDeliveryMode(DeliveryMode.PERSISTENT);
} else {
producers[i].setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
producers[i].start();
}
}
protected void createConsumers() throws JMSException {
consumers = new MemConsumer[consumerCount];
for (int i = 0; i < consumerCount; i++) {
consumers[i] = new MemConsumer(connectionFactory, destination);
consumers[i].setParent(allMessagesList);
consumers[i].start();
}
}
protected void closeProducers() throws JMSException {
for (int i = 0; i < producerCount; i++) {
producers[i].shutDown();
}
}
protected void closeConsumers() throws JMSException {
for (int i = 0; i < consumerCount; i++) {
consumers[i].shutDown();
}
}
protected ConnectionFactory createConnectionFactory() throws JMSException {
if (url == null || url.trim().equals("") || url.trim().equals("null")) {
return new ActiveMQConnectionFactory("vm://localhost");
} else {
return new ActiveMQConnectionFactory(url);
}
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
configureBroker(broker);
broker.start();
return broker;
}
protected void configureBroker(BrokerService broker) throws Exception {
broker.addConnector("vm://localhost");
broker.setDeleteAllMessagesOnStartup(true);
}
protected Properties getSysTestSettings() {
Properties settings = new Properties();
settings.setProperty("domain", topic ? "topic" : "queue");
settings.setProperty("durable", durable ? "durable" : "non-durable");
settings.setProperty("connection_checkpoint_size_kb", new Integer(connectionCheckpointSize).toString());
settings.setProperty("producer_count", new Integer(producerCount).toString());
settings.setProperty("consumer_count", new Integer(consumerCount).toString());
settings.setProperty("message_count", new Long(messageCount).toString());
settings.setProperty("message_size", new Integer(messageSize).toString());
settings.setProperty("prefetchSize", new Integer(prefetchSize).toString());
settings.setProperty("checkpoint_interval", new Integer(checkpointInterval).toString());
settings.setProperty("destination_name", destinationName);
settings.setProperty("report_name", reportName);
settings.setProperty("report_directory", reportDirectory);
settings.setProperty("connection_checkpoint_size", new Integer(connectionCheckpointSize).toString());
return settings;
}
}