blob: 757c3898d3cfdf31e567f6df28836b0e507c84dc [file] [log] [blame]
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tool;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* @version $Revision$
*/
public class AcidTestTool extends TestCase {
private Random random = new Random();
private byte data[];
private int workerCount = 10;
private PrintWriter statWriter;
// Worker configuration.
protected int recordSize = 1024;
protected int batchSize = 5;
protected int workerThinkTime = 500;
AtomicBoolean ignoreJMSErrors = new AtomicBoolean(false);
protected Destination target;
private ActiveMQConnectionFactory factory;
private Connection connection;
AtomicInteger publishedBatches = new AtomicInteger(0);
AtomicInteger consumedBatches = new AtomicInteger(0);
List errors = Collections.synchronizedList(new ArrayList());
private interface Worker extends Runnable {
public boolean waitForExit(long i) throws InterruptedException;
}
private final class ProducerWorker implements Worker {
Session session;
private MessageProducer producer;
private BytesMessage message;
CountDownLatch doneLatch = new CountDownLatch(1);
private final String workerId;
ProducerWorker(Session session, String workerId) throws JMSException {
this.session = session;
this.workerId = workerId;
producer = session.createProducer(target);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
message = session.createBytesMessage();
message.setStringProperty("workerId", workerId);
message.writeBytes(data);
}
public void run() {
try {
for( int batchId=0; true; batchId++ ) {
// System.out.println("Sending batch: "+workerId+" "+batchId);
for( int msgId=0; msgId < batchSize; msgId++ ) {
// Sleep some random amount of time less than workerThinkTime
try {
Thread.sleep(random.nextInt(workerThinkTime));
} catch (InterruptedException e1) {
return;
}
message.setIntProperty("batch-id",batchId);
message.setIntProperty("msg-id",msgId);
producer.send(message);
}
session.commit();
publishedBatches.incrementAndGet();
// System.out.println("Commited send batch: "+workerId+" "+batchId);
}
} catch (JMSException e) {
if( !ignoreJMSErrors.get() ) {
e.printStackTrace();
errors.add(e);
}
return;
} catch (Throwable e) {
e.printStackTrace();
errors.add(e);
return;
} finally {
System.out.println("Producer exiting.");
doneLatch.countDown();
}
}
public boolean waitForExit(long i) throws InterruptedException {
return doneLatch.await(i, TimeUnit.MILLISECONDS);
}
}
private final class ConsumerWorker implements Worker {
Session session;
private MessageConsumer consumer;
private final long timeout;
CountDownLatch doneLatch = new CountDownLatch(1);
private final String workerId;
ConsumerWorker(Session session, String workerId, long timeout) throws JMSException {
this.session = session;
this.workerId = workerId;
this.timeout = timeout;
consumer = session.createConsumer(target,"workerId='"+workerId+"'");
}
public void run() {
try {
int batchId=0;
while( true ) {
for( int msgId=0; msgId < batchSize; msgId++ ) {
// Sleep some random amount of time less than workerThinkTime
try {
Thread.sleep(random.nextInt(workerThinkTime));
} catch (InterruptedException e1) {
return;
}
Message message = consumer.receive(timeout);
if( msgId > 0 ) {
assertNotNull(message);
assertEquals(message.getIntProperty("batch-id"), batchId);
assertEquals(message.getIntProperty("msg-id"), msgId);
} else {
if( message==null ) {
System.out.println("At end of batch an don't have a next batch to process. done.");
return;
}
assertEquals(msgId, message.getIntProperty("msg-id") );
batchId = message.getIntProperty("batch-id");
// System.out.println("Receiving batch: "+workerId+" "+batchId);
}
}
session.commit();
consumedBatches.incrementAndGet();
// System.out.println("Commited receive batch: "+workerId+" "+batchId);
}
} catch (JMSException e) {
if( !ignoreJMSErrors.get() ) {
e.printStackTrace();
errors.add(e);
}
return;
} catch (Throwable e) {
e.printStackTrace();
errors.add(e);
return;
} finally {
System.out.println("Consumer exiting.");
doneLatch.countDown();
}
}
public boolean waitForExit(long i) throws InterruptedException {
return doneLatch.await(i, TimeUnit.MILLISECONDS);
}
}
/**
* @see junit.framework.TestCase#setUp()
*/
protected void setUp() throws Exception {
factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
this.target = new ActiveMQQueue(getClass().getName());
}
protected void tearDown() throws Exception {
if( connection!=null ) {
try { connection.close(); } catch (Throwable ignore) {}
connection = null;
}
}
/**
* @throws InterruptedException
* @throws JMSException
* @throws JMSException
*
*/
private void reconnect() throws InterruptedException, JMSException {
if( connection!=null ) {
try { connection.close(); } catch (Throwable ignore) {}
connection = null;
}
long reconnectDelay=1000;
JMSException lastError=null;
while( connection == null) {
if( reconnectDelay > 1000*10 ) {
reconnectDelay = 1000*10;
}
try {
connection = factory.createConnection();
connection.start();
} catch (JMSException e) {
lastError = e;
Thread.sleep(reconnectDelay);
reconnectDelay*=2;
}
}
}
/**
* @throws Throwable
* @throws IOException
*
*/
public void testAcidTransactions() throws Throwable {
System.out.println("Client threads write records using: Record Size: " + recordSize + ", Batch Size: "
+ batchSize + ", Worker Think Time: " + workerThinkTime);
// Create the record and fill it with some values.
data = new byte[recordSize];
for (int i = 0; i < data.length; i++) {
data[i] = (byte) i;
}
System.out.println("==============================================");
System.out.println("===> Start the server now.");
System.out.println("==============================================");
reconnect();
System.out.println("Starting " + workerCount + " Workers...");
ArrayList workers = new ArrayList();
for( int i=0; i< workerCount; i++ ){
String workerId = "worker-"+i;
Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 1000*5);
workers.add(w);
new Thread(w,"Consumer:"+workerId).start();
w = new ProducerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId);
workers.add(w);
new Thread(w,"Producer:"+workerId).start();
}
System.out.println("Waiting for "+(workerCount*10)+" batches to be delivered.");
//
// Wait for about 5 batches of messages per worker to be consumed before restart.
//
while( publishedBatches.get() < workerCount*5) {
System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
Thread.sleep(1000);
}
System.out.println("==============================================");
System.out.println("===> Server is under load now. Kill it!");
System.out.println("==============================================");
ignoreJMSErrors.set(true);
// Wait for all the workers to finish.
System.out.println("Waiting for all workers to exit due to server shutdown.");
for (Iterator iter = workers.iterator(); iter.hasNext();) {
Worker worker = (Worker) iter.next();
while( !worker.waitForExit(1000) ) {
System.out.println("==============================================");
System.out.println("===> Server is under load now. Kill it!");
System.out.println("==============================================");
System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
}
}
workers.clear();
// No errors should have occured so far.
if( errors.size()>0 )
throw (Throwable) errors.get(0);
System.out.println("==============================================");
System.out.println("===> Start the server now.");
System.out.println("==============================================");
reconnect();
System.out.println("Restarted.");
// Validate the all transactions were commited as a uow. Looking for partial commits.
for( int i=0; i< workerCount; i++ ){
String workerId = "worker-"+i;
Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 5*1000);
workers.add(w);
new Thread(w, "Consumer:"+workerId).start();
}
System.out.println("Waiting for restarted consumers to finish consuming all messages..");
for (Iterator iter = workers.iterator(); iter.hasNext();) {
Worker worker = (Worker) iter.next();
while( !worker.waitForExit(1000*5) ) {
System.out.println("Waiting for restarted consumers to finish consuming all messages..");
System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
}
}
workers.clear();
System.out.println("Workers finished..");
System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
if( errors.size()>0 )
throw (Throwable) errors.get(0);
}
public static void main(String[] args) {
try {
AcidTestTool tool = new AcidTestTool();
tool.setUp();
tool.testAcidTransactions();
tool.tearDown();
} catch (Throwable e) {
System.out.println("Test Failed: "+e.getMessage());
e.printStackTrace();
}
}
}