blob: 774422f6b32a3042c14970d2814219d204471944 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.lang.StringUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import java.io.File;
import java.io.FilenameFilter;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
public class AMQ7118Test {
protected static final Logger LOG = LoggerFactory.getLogger(AMQ7118Test.class);
protected static Random r = new Random();
final static String WIRE_LEVEL_ENDPOINT = "tcp://localhost:61616";
protected BrokerService broker;
protected Connection producerConnection;
protected Session pSession;
protected Connection cConnection;
protected Session cSession;
private final String xbean = "xbean:";
private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq7118";
int checkpointIndex = 0;
private static final ActiveMQConnectionFactory ACTIVE_MQ_CONNECTION_FACTORY = new ActiveMQConnectionFactory(WIRE_LEVEL_ENDPOINT);
@Before
public void setup() throws Exception {
deleteData(new File("target/data"));
createBroker();
ACTIVE_MQ_CONNECTION_FACTORY.setConnectionIDPrefix("bla");
}
@After
public void shutdown() throws Exception {
broker.stop();
}
public void setupProducerConnection() throws Exception {
producerConnection = ACTIVE_MQ_CONNECTION_FACTORY.createConnection();
producerConnection.start();
pSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void setupConsumerConnection() throws Exception {
cConnection = ACTIVE_MQ_CONNECTION_FACTORY.createConnection();
cConnection.setClientID("myClient1");
cConnection.start();
cSession = cConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
private void createBroker() throws Exception {
broker = new BrokerService();
broker = BrokerFactory.createBroker(xbean + confBase + "/activemq.xml");
broker.start();
}
@Test
public void testCompaction() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
setupProducerConnection();
setupConsumerConnection();
Topic topic = pSession.createTopic("test");
MessageConsumer consumer = cSession.createDurableSubscriber(topic, "clientId1");
LOG.info("Produce message to test topic");
produce(pSession, topic, 1, 512 ); // just one message
LOG.info("Consume message from test topic");
Message msg = consumer.receive(5000);
assertNotNull(msg);
LOG.info("Produce more messages to test topic and get into PFC");
boolean sent = produce(cSession, topic, 20, 512 * 1024); // Fill the store
assertFalse("Never got to PFC condition", sent);
LOG.info("PFC hit");
//We hit PFC, so shut down the producer
producerConnection.close();
//Lets check the db-*.log file count before checkpointUpdate
checkFiles(false, 21, "db-21.log");
// Force checkFiles update
checkFiles(true, 23, "db-23.log");
//The ackMessageFileMap should be clean, so no more writing
checkFiles(true, 23, "db-23.log");
//One more time just to be sure - The ackMessageFileMap should be clean, so no more writing
checkFiles(true, 23, "db-23.log");
//Read out the rest of the messages
LOG.info("Consuming the rest of the files...");
for (int i = 0; i < 20; i++) {
msg = consumer.receive(5000);
}
LOG.info("All messages Consumed.");
//Clean up the log files and be sure its stable
checkFiles(true, 2, "db-30.log");
checkFiles(true, 3, "db-31.log");
checkFiles(true, 2, "db-31.log");
checkFiles(true, 2, "db-31.log");
checkFiles(true, 2, "db-31.log");
broker.stop();
broker.waitUntilStopped();
}
protected static boolean produce(Session session, Topic topic, int messageCount, int messageSize) throws JMSException {
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < messageCount; i++) {
TextMessage helloMessage = session.createTextMessage(StringUtils.repeat("a", messageSize));
try {
producer.send(helloMessage);
} catch (ResourceAllocationException e){
return false;
}
}
return true;
}
private void deleteData(File file) {
String[] entries = file.list();
if (entries == null) return;
for (String s : entries) {
File currentFile = new File(file.getPath(), s);
if (currentFile.isDirectory()) {
deleteData(currentFile);
}
currentFile.delete();
}
file.delete();
}
private void checkFiles(boolean doCheckpoint, int expectedCount, String lastFileName) throws Exception {
File dbfiles = new File("target/data/kahadb");
FilenameFilter lff = new FilenameFilter(){
@Override
public boolean accept(File dir, String name) {
return name.toLowerCase().startsWith("db-") && name.toLowerCase().endsWith("log");
}
};
if(doCheckpoint) {
LOG.info("Initiating checkpointUpdate "+ ++checkpointIndex + " ...");
broker.getPersistenceAdapter().checkpoint(true);
TimeUnit.SECONDS.sleep(2);
LOG.info("Checkpoint complete.");
}
File files[] = dbfiles.listFiles(lff);
Arrays.sort(files, new DBFileComparator() );
logfiles(files);
assertEquals(expectedCount, files.length);
assertEquals(lastFileName, files[files.length-1].getName());
}
private void logfiles(File[] files){
LOG.info("Files found in KahaDB:");
for (File file : files) {
LOG.info(" " + file.getName());
}
}
private class DBFileComparator implements Comparator<File> {
@Override
public int compare(File o1, File o2) {
int n1 = extractNumber(o1.getName());
int n2 = extractNumber(o2.getName());
return n1 - n2;
}
private int extractNumber(String name) {
int i = 0;
try {
int s = name.indexOf('-')+1;
int e = name.lastIndexOf('.');
String number = name.substring(s, e);
i = Integer.parseInt(number);
} catch(Exception e) {
i = 0; // if filename does not match the format
// then default to 0
}
return i;
}
}
}