blob: 4e008de4a79122566246f630461f9f9d83129a69 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
import javax.jms.Queue;
import java.util.*;
import java.util.concurrent.CountDownLatch;
/*
* Test plan:
* Producer: publish messages into a queue, with 10 message groups, closing the group with seq=-1 on message 5 and message 10
* Consumers: 2 consumers created after all messages are sent
*
* Expected: for each group, messages 1-5 are handled by one consumer and messages 6-10 are handled by the other consumer. Messages
* 1 and 6 have the JMSXGroupFirstForConsumer property set to true.
*/
public class MessageGroupCloseTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(MessageGroupNewConsumerTest.class);
private Connection connection;
// Released after all messages are created
private CountDownLatch latchMessagesCreated = new CountDownLatch(1);
private int messagesSent, messagesRecvd1, messagesRecvd2, messageGroupCount, errorCountFirstForConsumer, errorCountWrongConsumerClose, errorCountDuplicateClose;
// groupID, count
private HashMap<String, Integer> messageGroups1 = new HashMap<String, Integer>();
private HashMap<String, Integer> messageGroups2 = new HashMap<String, Integer>();
private HashSet<String> closedGroups1 = new HashSet<String>();
private HashSet<String> closedGroups2 = new HashSet<String>();
// with the prefetch too high, this bug is not realized
private static final String connStr =
//"tcp://localhost:61616";
"vm://localhost?broker.persistent=false&broker.useJmx=false&jms.prefetchPolicy.all=1";
public void testNewConsumer() throws JMSException, InterruptedException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connStr);
connection = factory.createConnection();
connection.start();
final String queueName = this.getClass().getSimpleName();
final Thread producerThread = new Thread() {
public void run() {
try {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer prod = session.createProducer(queue);
for (int i=0; i<10; i++) {
for (int j=0; j<10; j++) {
int seq = j + 1;
if ((j+1) % 5 == 0) {
seq = -1;
}
Message message = generateMessage(session, Integer.toString(i), seq);
prod.send(message);
session.commit();
messagesSent++;
LOG.info("Sent message: group=" + i + ", seq="+ seq);
//Thread.sleep(20);
}
if (i % 100 == 0) {
LOG.info("Sent messages: group=" + i);
}
messageGroupCount++;
}
LOG.info(messagesSent+" messages sent");
latchMessagesCreated.countDown();
prod.close();
session.close();
} catch (Exception e) {
LOG.error("Producer failed", e);
}
}
};
final Thread consumerThread1 = new Thread() {
public void run() {
try {
latchMessagesCreated.await();
LOG.info("starting consumer1");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageConsumer con1 = session.createConsumer(queue);
while(true) {
Message message = con1.receive(5000);
if (message == null) break;
LOG.info("Con1: got message "+formatMessage(message));
checkMessage(message, "Con1", messageGroups1, closedGroups1);
session.commit();
messagesRecvd1++;
if (messagesRecvd1 % 100 == 0) {
LOG.info("Con1: got messages count=" + messagesRecvd1);
}
//Thread.sleep(50);
}
LOG.info("Con1: total messages=" + messagesRecvd1);
LOG.info("Con1: total message groups=" + messageGroups1.size());
con1.close();
session.close();
} catch (Exception e) {
LOG.error("Consumer 1 failed", e);
}
}
};
final Thread consumerThread2 = new Thread() {
public void run() {
try {
latchMessagesCreated.await();
LOG.info("starting consumer2");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageConsumer con2 = session.createConsumer(queue);
while(true) {
Message message = con2.receive(5000);
if (message == null) { break; }
LOG.info("Con2: got message "+formatMessage(message));
checkMessage(message, "Con2", messageGroups2, closedGroups2);
session.commit();
messagesRecvd2++;
if (messagesRecvd2 % 100 == 0) {
LOG.info("Con2: got messages count=" + messagesRecvd2);
}
//Thread.sleep(50);
}
con2.close();
session.close();
LOG.info("Con2: total messages=" + messagesRecvd2);
LOG.info("Con2: total message groups=" + messageGroups2.size());
} catch (Exception e) {
LOG.error("Consumer 2 failed", e);
}
}
};
consumerThread2.start();
consumerThread1.start();
producerThread.start();
// wait for threads to finish
producerThread.join();
consumerThread1.join();
consumerThread2.join();
connection.close();
// check results
assertEquals("consumers should get all the messages", messagesSent, messagesRecvd1 + messagesRecvd2);
assertEquals("not all message groups closed for consumer 1", messageGroups1.size(), closedGroups1.size());
assertEquals("not all message groups closed for consumer 2", messageGroups2.size(), closedGroups2.size());
assertTrue("producer failed to send any messages", messagesSent > 0);
assertEquals("JMSXGroupFirstForConsumer not set", 0, errorCountFirstForConsumer);
assertEquals("wrong consumer got close message", 0, errorCountWrongConsumerClose);
assertEquals("consumer got duplicate close message", 0, errorCountDuplicateClose);
}
public Message generateMessage(Session session, String groupId, int seq) throws JMSException {
TextMessage m = session.createTextMessage();
m.setJMSType("TEST_MESSAGE");
m.setStringProperty("JMSXGroupID", groupId);
m.setIntProperty("JMSXGroupSeq", seq);
m.setText("<?xml?><testMessage/>");
return m;
}
public String formatMessage(Message m) {
try {
return "group="+m.getStringProperty("JMSXGroupID")+", seq="+m.getIntProperty("JMSXGroupSeq");
} catch (Exception e) {
return e.getClass().getSimpleName()+": "+e.getMessage();
}
}
public void checkMessage(Message m, String consumerId, Map<String, Integer> messageGroups, Set<String> closedGroups) throws JMSException {
String groupId = m.getStringProperty("JMSXGroupID");
int seq = m.getIntProperty("JMSXGroupSeq");
Integer count = messageGroups.get(groupId);
if (count == null) {
// first time seeing this group
if (!m.propertyExists("JMSXGroupFirstForConsumer") ||
!m.getBooleanProperty("JMSXGroupFirstForConsumer")) {
LOG.info(consumerId + ": JMSXGroupFirstForConsumer not set for group=" + groupId + ", seq=" +seq);
errorCountFirstForConsumer++;
}
if (seq == -1) {
closedGroups.add(groupId);
LOG.info(consumerId + ": wrong consumer got close message for group=" + groupId);
errorCountWrongConsumerClose++;
}
messageGroups.put(groupId, 1);
} else {
// existing group
if (closedGroups.contains(groupId)) {
// group reassigned to same consumer
closedGroups.remove(groupId);
if (!m.propertyExists("JMSXGroupFirstForConsumer") ||
!m.getBooleanProperty("JMSXGroupFirstForConsumer")) {
LOG.info(consumerId + ": JMSXGroupFirstForConsumer not set for group=" + groupId + ", seq=" +seq);
errorCountFirstForConsumer++;
}
if (seq == -1) {
LOG.info(consumerId + ": consumer got duplicate close message for group=" + groupId);
errorCountDuplicateClose++;
}
}
if (seq == -1) {
closedGroups.add(groupId);
}
messageGroups.put(groupId, count + 1);
}
}
}