blob: ee1037b54a2b0debde8aaa2cb20f1dea32c8a131 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.load;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class LoadTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(LoadTest.class);
protected BrokerService broker;
protected String bindAddress="tcp://localhost:61616";
protected LoadController controller;
protected LoadClient[] clients;
protected ConnectionFactory factory;
protected Destination destination;
protected int numberOfClients = 50;
protected int deliveryMode = DeliveryMode.PERSISTENT;
protected int batchSize = 1000;
protected int numberOfBatches = 10;
protected int timeout = Integer.MAX_VALUE;
protected boolean connectionPerMessage = false;
protected Connection managementConnection;
protected Session managementSession;
/**
* Sets up a test where the producer and consumer have their own connection.
*
* @see junit.framework.TestCase#setUp()
*/
protected void setUp() throws Exception {
if (broker == null) {
broker = createBroker(bindAddress);
}
factory = createConnectionFactory(bindAddress);
managementConnection = factory.createConnection();
managementSession = managementConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination startDestination = createDestination(managementSession, getClass()+".start");
Destination endDestination = createDestination(managementSession, getClass()+".end");
LOG.info("Running with " + numberOfClients + " clients - sending "
+ numberOfBatches + " batches of " + batchSize + " messages");
controller = new LoadController("Controller",factory);
controller.setBatchSize(batchSize);
controller.setNumberOfBatches(numberOfBatches);
controller.setDeliveryMode(deliveryMode);
controller.setConnectionPerMessage(connectionPerMessage);
controller.setStartDestination(startDestination);
controller.setNextDestination(endDestination);
controller.setTimeout(timeout);
clients = new LoadClient[numberOfClients];
for (int i = 0; i < numberOfClients; i++) {
Destination inDestination = null;
if (i==0) {
inDestination = startDestination;
}else {
inDestination = createDestination(managementSession, getClass() + ".client."+(i));
}
Destination outDestination = null;
if (i==(numberOfClients-1)) {
outDestination = endDestination;
}else {
outDestination = createDestination(managementSession, getClass() + ".client."+(i+1));
}
LoadClient client = new LoadClient("client("+i+")",factory);
client.setTimeout(timeout);
client.setDeliveryMode(deliveryMode);
client.setConnectionPerMessage(connectionPerMessage);
client.setStartDestination(inDestination);
client.setNextDestination(outDestination);
clients[i] = client;
}
super.setUp();
}
protected void tearDown() throws Exception {
super.tearDown();
managementConnection.close();
for (int i = 0; i < numberOfClients; i++) {
clients[i].stop();
}
controller.stop();
if (broker != null) {
broker.stop();
broker = null;
}
}
protected Destination createDestination(Session s, String destinationName) throws JMSException {
return s.createQueue(destinationName);
}
/**
* Factory method to create a new broker
*
* @throws Exception
*/
protected BrokerService createBroker(String uri) throws Exception {
BrokerService answer = new BrokerService();
configureBroker(answer,uri);
answer.start();
return answer;
}
protected void configureBroker(BrokerService answer,String uri) throws Exception {
answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector(uri);
answer.setUseShutdownHook(false);
}
protected ActiveMQConnectionFactory createConnectionFactory(String uri) throws Exception {
return new ActiveMQConnectionFactory(uri);
}
public void testLoad() throws JMSException, InterruptedException {
for (int i = 0; i < numberOfClients; i++) {
clients[i].start();
}
controller.start();
assertEquals((batchSize* numberOfBatches),controller.awaitTestComplete());
}
}