blob: f7edebde9882537491eb34056c8a4f12d9dd4f16 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.StoreUsage;
import org.apache.activemq.usage.SystemUsage;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
public class DuplicateFromStoreTest {
static Logger LOG = LoggerFactory.getLogger(DuplicateFromStoreTest.class);
String activemqURL;
BrokerService broker;
protected final static String DESTNAME = "TEST";
protected final static int NUM_PRODUCERS = 100;
protected final static int NUM_CONSUMERS = 20;
protected final static int NUM_MSGS = 20000;
protected final static int CONSUMER_SLEEP = 0;
protected final static int PRODUCER_SLEEP = 10;
public static CountDownLatch producersFinished = new CountDownLatch(NUM_PRODUCERS);
public static CountDownLatch consumersFinished = new CountDownLatch(NUM_CONSUMERS );
public AtomicInteger totalMessagesToSend = new AtomicInteger(NUM_MSGS);
public AtomicInteger totalMessagesSent = new AtomicInteger(NUM_MSGS);
public AtomicInteger totalReceived = new AtomicInteger(0);
public int messageSize = 16*1000;
@Before
public void startBroker() throws Exception {
broker = new BrokerService();
broker.setDeleteAllMessagesOnStartup(true);
broker.addConnector("tcp://0.0.0.0:0");
// Create <policyEntry>
PolicyEntry policy = new PolicyEntry();
ActiveMQDestination dest = new ActiveMQQueue(">");
policy.setDestination(dest);
policy.setMemoryLimit(10 * 1024 * 1024); // 10 MB
policy.setExpireMessagesPeriod(0);
policy.setEnableAudit(false); // allow any duplicates from the store to bubble up to the q impl
policy.setQueuePrefetch(100);
PolicyMap policies = new PolicyMap();
policies.put(dest, policy);
broker.setDestinationPolicy(policies);
// configure <systemUsage>
MemoryUsage memoryUsage = new MemoryUsage();
memoryUsage.setPercentOfJvmHeap(50);
StoreUsage storeUsage = new StoreUsage();
storeUsage.setLimit(8 * 1024 * 1024 * 1024); // 8 gb
SystemUsage memoryManager = new SystemUsage();
memoryManager.setMemoryUsage(memoryUsage);
memoryManager.setStoreUsage(storeUsage);
broker.setSystemUsage(memoryManager);
// configure KahaDB persistence
PersistenceAdapter kahadb = new KahaDBStore();
((KahaDBStore) kahadb).setConcurrentStoreAndDispatchQueues(true);
broker.setPersistenceAdapter(kahadb);
// start broker
broker.start();
broker.waitUntilStarted();
activemqURL = broker.getTransportConnectorByScheme("tcp").getPublishableConnectString();
}
@After
public void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
}
}
@Test
public void testDuplicateMessage() throws Exception {
LOG.info("Testing for duplicate messages.");
//create producer and consumer threads
ExecutorService producers = Executors.newFixedThreadPool(NUM_PRODUCERS);
ExecutorService consumers = Executors.newFixedThreadPool(NUM_CONSUMERS);
createOpenwireClients(producers, consumers);
LOG.info("All producers and consumers got started. Awaiting their termination");
producersFinished.await(100, TimeUnit.MINUTES);
LOG.info("All producers have terminated. remaining to send: " + totalMessagesToSend.get() + ", sent:" + totalMessagesSent.get());
consumersFinished.await(100, TimeUnit.MINUTES);
LOG.info("All consumers have terminated.");
producers.shutdownNow();
consumers.shutdownNow();
assertEquals("no messages pending, i.e. dlq empty", 0l, ((RegionBroker)broker.getRegionBroker()).getDestinationStatistics().getMessages().getCount());
// validate cache can be enabled if disabled
}
protected void createOpenwireClients(ExecutorService producers, ExecutorService consumers) {
for (int i = 0; i < NUM_CONSUMERS; i++) {
LOG.trace("Creating consumer for destination " + DESTNAME);
Consumer consumer = new Consumer(DESTNAME, false);
consumers.submit(consumer);
// wait for consumer to signal it has fully initialized
synchronized(consumer.init) {
try {
consumer.init.wait();
} catch (InterruptedException e) {
LOG.error(e.toString(), e);
}
}
}
for (int i = 0; i < NUM_PRODUCERS; i++) {
LOG.trace("Creating producer for destination " + DESTNAME );
Producer producer = new Producer(DESTNAME, false, 0);
producers.submit(producer);
}
}
class Producer implements Runnable {
Logger log = LOG;
protected String destName = "TEST";
protected boolean isTopicDest = false;
public Producer(String dest, boolean isTopic, int ttl) {
this.destName = dest;
this.isTopicDest = isTopic;
}
/**
* Connect to broker and constantly send messages
*/
public void run() {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(activemqURL);
connection = amq.createConnection();
connection.setExceptionListener(new javax.jms.ExceptionListener() {
public void onException(javax.jms.JMSException e) {
e.printStackTrace();
}
});
connection.start();
// Create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination;
if (isTopicDest) {
// Create the destination (Topic or Queue)
destination = session.createTopic(destName);
} else {
destination = session.createQueue(destName);
}
// Create a MessageProducer from the Session to the Topic or Queue
producer = session.createProducer(destination);
// Create message
long counter = 0;
//enlarge msg to 16 kb
int msgSize = 16 * 1024;
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.setLength(msgSize + 15);
stringBuilder.append("Message: ");
stringBuilder.append(counter);
for (int j = 0; j < (msgSize / 10); j++) {
stringBuilder.append("XXXXXXXXXX");
}
String text = stringBuilder.toString();
TextMessage message = session.createTextMessage(text);
// send message
while (totalMessagesToSend.decrementAndGet() >= 0) {
producer.send(message);
totalMessagesSent.incrementAndGet();
log.debug("Sent message: " + counter);
counter++;
if ((counter % 10000) == 0)
log.info("sent " + counter + " messages");
Thread.sleep(PRODUCER_SLEEP);
}
} catch (Exception ex) {
log.error(ex.toString());
return;
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (Exception ignored) {
} finally {
producersFinished.countDown();
}
}
log.debug("Closing producer for " + destName);
}
}
class Consumer implements Runnable {
public Object init = new Object();
protected String queueName = "TEST";
boolean isTopic = false;
Logger log = LOG;
public Consumer(String destName, boolean topic) {
this.isTopic = topic;
this.queueName = destName;
}
/**
* connect to broker and receive messages
*/
public void run() {
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(activemqURL);
connection = amq.createConnection();
connection.setExceptionListener(new javax.jms.ExceptionListener() {
public void onException(javax.jms.JMSException e) {
e.printStackTrace();
}
});
connection.start();
// Create a Session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = null;
if (isTopic)
destination = session.createTopic(queueName);
else
destination = session.createQueue(queueName);
//Create a MessageConsumer from the Session to the Topic or Queue
consumer = session.createConsumer(destination);
synchronized (init) {
init.notifyAll();
}
// Wait for a message
long counter = 0;
while (totalReceived.get() < NUM_MSGS) {
Message message2 = consumer.receive(5000);
if (message2 instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message2;
String text = textMessage.getText();
log.debug("Received: " + text.substring(0, 50));
} else if (totalReceived.get() < NUM_MSGS) {
log.error("Received message of unsupported type. Expecting TextMessage. count: " + totalReceived.get());
} else {
// all done
break;
}
if (message2 != null) {
counter++;
totalReceived.incrementAndGet();
if ((counter % 10000) == 0)
log.info("received " + counter + " messages");
Thread.sleep(CONSUMER_SLEEP);
}
}
} catch (Exception e) {
log.error("Error in Consumer: " + e.getMessage());
return;
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (Exception ignored) {
} finally {
consumersFinished.countDown();
}
}
}
}
}