blob: 69d736a6fe5a11874a669002aed846d27afc959b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.io.File;
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VerifySteadyEnqueueRate extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(VerifySteadyEnqueueRate.class);
private static int max_messages = 1000000;
private String destinationName = getName() + "_Queue";
private BrokerService broker;
final boolean useTopic = false;
private boolean useAMQPStore = false;
protected static final String payload = new String(new byte[24]);
public void setUp() throws Exception {
startBroker();
}
public void tearDown() throws Exception {
broker.stop();
}
public void testEnqueueRateCanMeetSLA() throws Exception {
if (true) {
return;
}
doTestEnqueue(false);
}
private void doTestEnqueue(final boolean transacted) throws Exception {
final long min = 100;
final AtomicLong total = new AtomicLong(0);
final AtomicLong slaViolations = new AtomicLong(0);
final AtomicLong max = new AtomicLong(0);
final int numThreads = 6;
Runnable runner = new Runnable() {
public void run() {
try {
MessageSender producer = new MessageSender(destinationName,
createConnection(), transacted, useTopic);
for (int i = 0; i < max_messages; i++) {
long startT = System.currentTimeMillis();
producer.send(payload);
long endT = System.currentTimeMillis();
long duration = endT - startT;
total.incrementAndGet();
if (duration > max.get()) {
max.set(duration);
}
if (duration > min) {
slaViolations.incrementAndGet();
System.err.println("SLA violation @ "+Thread.currentThread().getName()
+ " "
+ DateFormat.getTimeInstance().format(
new Date(startT)) + " at message "
+ i + " send time=" + duration
+ " - Total SLA violations: "+slaViolations.get()+"/"+total.get()+" ("+String.format("%.6f", 100.0*slaViolations.get()/total.get())+"%)");
}
}
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Max Violation = " + max + " - Total SLA violations: "+slaViolations.get()+"/"+total.get()+" ("+String.format("%.6f", 100.0*slaViolations.get()/total.get())+"%)");
}
};
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < numThreads; i++) {
executor.execute(runner);
}
executor.shutdown();
while(!executor.isTerminated()) {
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}
private Connection createConnection() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
broker.getTransportConnectors().get(0).getConnectUri());
return factory.createConnection();
}
private void startBroker() throws Exception {
broker = new BrokerService();
//broker.setDeleteAllMessagesOnStartup(true);
broker.setPersistent(true);
broker.setUseJmx(true);
if (useAMQPStore) {
AMQPersistenceAdapterFactory factory = (AMQPersistenceAdapterFactory) broker
.getPersistenceFactory();
// ensure there are a bunch of data files but multiple entries in
// each
// factory.setMaxFileLength(1024 * 20);
// speed up the test case, checkpoint an cleanup early and often
// factory.setCheckpointInterval(500);
factory.setCleanupInterval(1000 * 60 * 30);
factory.setSyncOnWrite(false);
// int indexBinSize=262144; // good for 6M
int indexBinSize = 1024;
factory.setIndexMaxBinSize(indexBinSize * 2);
factory.setIndexBinSize(indexBinSize);
factory.setIndexPageSize(192 * 20);
} else {
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(new File("target/activemq-data/kahadb"));
// The setEnableJournalDiskSyncs(false) setting is a little dangerous right now, as I have not verified
// what happens if the index is updated but a journal update is lost.
// Index is going to be in consistent, but can it be repaired?
kaha.setEnableJournalDiskSyncs(false);
// Using a bigger journal file size makes he take fewer spikes as it is not switching files as often.
kaha.setJournalMaxFileLength(1024*1024*100);
// small batch means more frequent and smaller writes
kaha.setIndexWriteBatchSize(100);
// do the index write in a separate thread
kaha.setEnableIndexWriteAsync(true);
broker.setPersistenceAdapter(kaha);
}
broker.addConnector("tcp://localhost:0").setName("Default");
broker.start();
LOG.info("Starting broker..");
}
}