blob: 5d43f66510515fd45d7905f2bcae9f202a76c13c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import junit.framework.TestCase;
/*
* Test plan:
* Producer: publish messages into a queue, with three message groups
* Consumer1: created before any messages are created
* Consumer2: created after consumer1 has processed one message from each message group
*
* All three groups are handled by to consumer1, so consumer2 should not get any messages.
* See bug AMQ-2016: Message grouping fails when consumers are added
*/
public class MessageGroupNewConsumerTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(MessageGroupNewConsumerTest.class);
private Connection connection;
// Released after the messages are created
private CountDownLatch latchMessagesCreated = new CountDownLatch(1);
// Released after one message from each group is consumed
private CountDownLatch latchGroupsAcquired = new CountDownLatch(1);
private static final String[] groupNames = { "GrA", "GrB", "GrC" };
private int messagesSent, messagesRecvd1, messagesRecvd2;
// with the prefetch too high, this bug is not realized
private static final String connStr = "vm://localhost?broker.persistent=false&broker.useJmx=false&jms.prefetchPolicy.all=1";
public void testNewConsumer() throws JMSException, InterruptedException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connStr);
connection = factory.createConnection();
connection.start();
final String queueName = this.getClass().getSimpleName();
final Thread producerThread = new Thread() {
public void run() {
try {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageProducer prod = session.createProducer(queue);
for (int i=0; i<10; i++) {
for(String group : groupNames) {
Message message = generateMessage(session, group, i+1);
prod.send(message);
session.commit();
messagesSent++;
}
LOG.info("Sent message seq "+ (i+1));
if (i==0) {
latchMessagesCreated.countDown();
}
if (i==2) {
LOG.info("Prod: Waiting for groups");
latchGroupsAcquired.await();
}
Thread.sleep(20);
}
LOG.info(messagesSent+" messages sent");
prod.close();
session.close();
} catch (Exception e) {
LOG.error("Producer failed", e);
}
}
};
final Thread consumerThread1 = new Thread() {
public void run() {
try {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageConsumer con1 = session.createConsumer(queue);
latchMessagesCreated.await();
while(true) {
Message message = con1.receive(1000);
if (message == null) break;
LOG.info("Con1 got message "+formatMessage(message));
session.commit();
messagesRecvd1++;
// since we get the messages in order, the first few messages will be one from each group
// after we get one from each group, start the other consumer
if (messagesRecvd1 == groupNames.length) {
LOG.info("All groups acquired");
latchGroupsAcquired.countDown();
Thread.sleep(1000);
}
Thread.sleep(50);
}
LOG.info(messagesRecvd1+" messages received by consumer1");
con1.close();
session.close();
} catch (Exception e) {
LOG.error("Consumer 1 failed", e);
}
}
};
final Thread consumerThread2 = new Thread() {
public void run() {
try {
latchGroupsAcquired.await();
while(consumerThread1.isAlive()) {
LOG.info("(re)starting consumer2");
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(queueName);
MessageConsumer con2 = session.createConsumer(queue);
while(true) {
Message message = con2.receive(500);
if (message == null) break;
LOG.info("Con2 got message "+formatMessage(message));
session.commit();
messagesRecvd2++;
Thread.sleep(50);
}
con2.close();
session.close();
}
LOG.info(messagesRecvd2+" messages received by consumer2");
} catch (Exception e) {
LOG.error("Consumer 2 failed", e);
}
}
};
consumerThread2.start();
consumerThread1.start();
producerThread.start();
// wait for threads to finish
producerThread.join();
consumerThread1.join();
consumerThread2.join();
connection.close();
// check results
assertEquals("consumer 2 should not get any messages", 0, messagesRecvd2);
assertEquals("consumer 1 should get all the messages", messagesSent, messagesRecvd1);
assertTrue("producer failed to send any messages", messagesSent > 0);
}
public Message generateMessage(Session session, String groupId, int seq) throws JMSException {
TextMessage m = session.createTextMessage();
m.setJMSType("TEST_MESSAGE");
m.setStringProperty("JMSXGroupID", groupId);
m.setIntProperty("JMSXGroupSeq", seq);
m.setText("<?xml?><testMessage/>");
return m;
}
public String formatMessage(Message m) {
try {
return m.getStringProperty("JMSXGroupID")+"-"+m.getIntProperty("JMSXGroupSeq")+"-"+m.getBooleanProperty("JMSXGroupFirstForConsumer");
} catch (Exception e) {
return e.getClass().getSimpleName()+": "+e.getMessage();
}
}
}