blob: cf7bff659dba11107ae09a7b450d4c8da96d1786 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.systests.jms_1_1.messagegroup;
import static javax.jms.Session.AUTO_ACKNOWLEDGE;
import static javax.jms.Session.CLIENT_ACKNOWLEDGE;
import static javax.jms.Session.SESSION_TRANSACTED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.queue.MessageGroupType;
import org.apache.qpid.systests.JmsTestBase;
public class MessageGroupTest extends JmsTestBase
{
private static final Logger LOGGER = LoggerFactory.getLogger(MessageGroupTest.class);
@Test
public void simpleGroupAssignment() throws Exception
{
simpleGroupAssignment(false, false);
}
@Test
public void sharedGroupSimpleGroupAssignment() throws Exception
{
simpleGroupAssignment(true, false);
}
@Test
public void simpleGroupAssignmentWithJMSXGroupID() throws Exception
{
simpleGroupAssignment(false, true);
}
@Test
public void sharedGroupSimpleGroupAssignmentWithJMSXGroupID() throws Exception
{
simpleGroupAssignment(true, true);
}
/**
* Pre populate the queue with messages with groups as follows
*
* ONE
* TWO
* ONE
* TWO
*
* Create two consumers with prefetch of 1, the first consumer should then be assigned group ONE, the second
* consumer assigned group TWO if they are started in sequence.
*
* Thus doing
*
* c1 <--- (ONE)
* c2 <--- (TWO)
* c2 ack --->
*
* c2 should now be able to receive a second message from group TWO (skipping over the message from group ONE)
*
* i.e.
*
* c2 <--- (TWO)
* c2 ack --->
* c1 <--- (ONE)
* c1 ack --->
*
*/
private void simpleGroupAssignment(boolean sharedGroups, final boolean useDefaultGroup) throws Exception
{
final String groupKey = getGroupKey(useDefaultGroup);
String queueName = getTestName();
Destination queue = createQueue(queueName, sharedGroups, useDefaultGroup);
final Connection producerConnection = getConnection();
try
{
producerConnection.start();
final Session producerSession = producerConnection.createSession(true, SESSION_TRANSACTED);
final MessageProducer producer = producerSession.createProducer(queue);
String[] groups = { "ONE", "TWO"};
for (int msg = 0; msg < 4; msg++)
{
producer.send(createMessage(producerSession, msg, groups[msg % groups.length], useDefaultGroup));
}
producerSession.commit();
}
finally
{
producerConnection.close();
}
final Connection consumerConnection = getConnectionBuilder().setPrefetch(0).build();
try
{
Session cs1 = consumerConnection.createSession(false, CLIENT_ACKNOWLEDGE);
Session cs2 = consumerConnection.createSession(false, CLIENT_ACKNOWLEDGE);
MessageConsumer consumer1 = cs1.createConsumer(queue);
MessageConsumer consumer2 = cs2.createConsumer(queue);
consumerConnection.start();
Message cs1Received = consumer1.receive(getReceiveTimeout());
assertNotNull("Consumer 1 should have received first message", cs1Received);
Message cs2Received = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received first message", cs2Received);
cs2Received.acknowledge();
Message cs2Received2 = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received second message", cs2Received2);
assertEquals("Differing groups", cs2Received2.getStringProperty(groupKey),
cs2Received.getStringProperty(groupKey));
cs1Received.acknowledge();
Message cs1Received2 = consumer1.receive(getReceiveTimeout());
assertNotNull("Consumer 1 should have received second message", cs1Received2);
assertEquals("Differing groups", cs1Received2.getStringProperty(groupKey),
cs1Received.getStringProperty(groupKey));
cs1Received2.acknowledge();
cs2Received2.acknowledge();
assertNull(consumer1.receive(getReceiveTimeout()));
assertNull(consumer2.receive(getReceiveTimeout()));
}
finally
{
consumerConnection.close();
}
}
@Test
public void consumerCloseGroupAssignment() throws Exception
{
consumerCloseGroupAssignment(false, false);
}
@Test
public void sharedGroupConsumerCloseGroupAssignment() throws Exception
{
consumerCloseGroupAssignment(true, false);
}
/**
*
* Tests that upon closing a consumer, groups previously assigned to that consumer are reassigned to a different
* consumer.
*
* Pre-populate the queue as ONE, ONE, TWO, ONE
*
* create in sequence two consumers
*
* receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
*
* Then close c1 before acking.
*
* If we now attempt to receive from c2, then the remaining messages in group ONE should be available (which
* requires c2 to go "backwards" in the queue).
*
**/
private void consumerCloseGroupAssignment(boolean sharedGroups, final boolean useDefaultGroup) throws Exception
{
final String groupKey = getGroupKey(useDefaultGroup);
String queueName = getTestName();
Destination queue = createQueue(queueName, sharedGroups, false);
final Connection producerConnection = getConnection();
try
{
final Session producerSession = producerConnection.createSession(true, SESSION_TRANSACTED);
final MessageProducer producer = producerSession.createProducer(queue);
producer.send(createMessage(producerSession, 1, "ONE", useDefaultGroup));
producer.send(createMessage(producerSession, 2, "ONE", useDefaultGroup));
producer.send(createMessage(producerSession, 3, "TWO", useDefaultGroup));
producer.send(createMessage(producerSession, 4, "ONE", useDefaultGroup));
producerSession.commit();
}
finally
{
producerConnection.close();
}
final Connection consumerConnection = getConnectionBuilder().setPrefetch(0).build();
try
{
consumerConnection.start();
Session cs1 = consumerConnection.createSession(true, SESSION_TRANSACTED);
Session cs2 = consumerConnection.createSession(true, SESSION_TRANSACTED);
MessageConsumer consumer1 = cs1.createConsumer(queue);
MessageConsumer consumer2 = cs2.createConsumer(queue);
Message cs1Received = consumer1.receive(getReceiveTimeout());
assertNotNull("Consumer 1 should have received first message", cs1Received);
assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg"));
Message cs2Received = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received first message", cs2Received);
assertEquals("incorrect message received", 3, cs2Received.getIntProperty("msg"));
cs2.commit();
Message cs2Received2 = consumer2.receive(getReceiveTimeout());
assertNull("Consumer 2 should not yet have received a second message", cs2Received2);
consumer1.close();
cs1.commit();
Message cs2Received3 = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received second message", cs2Received3);
assertEquals("Unexpected group",
"ONE",
cs2Received3.getStringProperty(groupKey));
assertEquals("incorrect message received", 2, cs2Received3.getIntProperty("msg"));
cs2.commit();
Message cs2Received4 = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received third message", cs2Received4);
assertEquals("Unexpected group",
"ONE",
cs2Received4.getStringProperty(groupKey));
assertEquals("incorrect message received", 4, cs2Received4.getIntProperty("msg"));
cs2.commit();
assertNull(consumer2.receive(getReceiveTimeout()));
}
finally
{
consumerConnection.close();
}
}
@Test
public void consumerCloseWithRelease() throws Exception
{
consumerCloseWithRelease(false, false);
}
@Test
public void sharedGroupConsumerCloseWithRelease() throws Exception
{
consumerCloseWithRelease(true, false);
}
/**
*
* Tests that upon closing a consumer and its session, groups previously assigned to that consumer are reassigned
* to a different consumer, including messages which were previously delivered but have now been released.
*
* Pre-populate the queue as ONE, ONE, TWO, ONE
*
* create in sequence two consumers
*
* receive first from c1 then c2 (thus ONE is assigned to c1, TWO to c2)
*
* Then close c1 and its session without acking.
*
* If we now attempt to receive from c2, then the all messages in group ONE should be available (which
* requires c2 to go "backwards" in the queue). The first such message should be marked as redelivered
*
*/
private void consumerCloseWithRelease(boolean sharedGroups, final boolean useDefaultGroup) throws Exception
{
String queueName = getTestName();
final String groupKey = getGroupKey(useDefaultGroup);
Destination queue = createQueue(queueName, sharedGroups, false);
final Connection producerConnection = getConnection();
try
{
producerConnection.start();
final Session producerSession = producerConnection.createSession(true, SESSION_TRANSACTED);
final MessageProducer producer = producerSession.createProducer(queue);
producer.send(createMessage(producerSession, 1, "ONE", useDefaultGroup));
producer.send(createMessage(producerSession, 2, "ONE", useDefaultGroup));
producer.send(createMessage(producerSession, 3, "TWO", useDefaultGroup));
producer.send(createMessage(producerSession, 4, "ONE", useDefaultGroup));
producerSession.commit();
}
finally
{
producerConnection.close();
}
final Connection consumerConnection = getConnectionBuilder().setPrefetch(0).build();
try
{
consumerConnection.start();
Session cs1 = consumerConnection.createSession(true, SESSION_TRANSACTED);
Session cs2 = consumerConnection.createSession(true, SESSION_TRANSACTED);
MessageConsumer consumer1 = cs1.createConsumer(queue);
MessageConsumer consumer2 = cs2.createConsumer(queue);
Message cs1Received = consumer1.receive(getReceiveTimeout());
assertNotNull("Consumer 1 should have received its first message", cs1Received);
assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg"));
Message received = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received its first message", received);
assertEquals("incorrect message received", 3, received.getIntProperty("msg"));
Message received2 = consumer2.receive(getReceiveTimeout());
assertNull("Consumer 2 should not yet have received second message", received2);
consumer1.close();
cs1.close();
cs2.commit();
received = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should now have received second message", received);
assertEquals("Unexpected group",
"ONE",
received.getStringProperty(groupKey));
assertEquals("incorrect message received", 1, received.getIntProperty("msg"));
assertTrue("Expected second message to be marked as redelivered " + received.getIntProperty("msg"),
received.getJMSRedelivered());
cs2.commit();
received = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received a third message", received);
assertEquals("Unexpected group",
"ONE",
received.getStringProperty(groupKey));
assertEquals("incorrect message received", 2, received.getIntProperty("msg"));
cs2.commit();
received = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received a fourth message", received);
assertEquals("Unexpected group",
"ONE",
received.getStringProperty(groupKey));
assertEquals("incorrect message received", 4, received.getIntProperty("msg"));
cs2.commit();
assertNull(consumer2.receive(getReceiveTimeout()));
}
finally
{
consumerConnection.close();
}
}
@Test
public void groupAssignmentSurvivesEmpty() throws Exception
{
groupAssignmentOnEmpty(false, false);
}
@Test
public void sharedGroupAssignmentDoesNotSurviveEmpty() throws Exception
{
groupAssignmentOnEmpty(true, false);
}
private void groupAssignmentOnEmpty(boolean sharedGroups, final boolean useDefaultGroup) throws Exception
{
String queueName = getTestName();
Destination queue = createQueue(queueName, sharedGroups, false);
final Connection producerConnection = getConnection();
try
{
producerConnection.start();
final Session producerSession = producerConnection.createSession(true, SESSION_TRANSACTED);
final MessageProducer producer = producerSession.createProducer(queue);
producer.send(createMessage(producerSession, 1, "ONE", useDefaultGroup));
producer.send(createMessage(producerSession, 2, "TWO", useDefaultGroup));
producer.send(createMessage(producerSession, 3, "THREE", useDefaultGroup));
producer.send(createMessage(producerSession, 4, "ONE", useDefaultGroup));
producerSession.commit();
}
finally
{
producerConnection.close();
}
final Connection consumerConnection = getConnectionBuilder().setPrefetch(0).build();
try
{
consumerConnection.start();
Session cs1 = consumerConnection.createSession(true, SESSION_TRANSACTED);
Session cs2 = consumerConnection.createSession(true, SESSION_TRANSACTED);
MessageConsumer consumer1 = cs1.createConsumer(queue);
MessageConsumer consumer2 = cs2.createConsumer(queue);
Message cs1Received = consumer1.receive(getReceiveTimeout());
assertNotNull("Consumer 1 should have received its first message", cs1Received);
assertEquals("incorrect message received", 1, cs1Received.getIntProperty("msg"));
Message cs2Received = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received its first message", cs2Received);
assertEquals("incorrect message received", 2, cs2Received.getIntProperty("msg"));
cs1.commit();
cs1Received = consumer1.receive(getReceiveTimeout());
assertNotNull("Consumer 1 should have received its second message", cs1Received);
assertEquals("incorrect message received", 3, cs1Received.getIntProperty("msg"));
// We expect different behaviours from "shared groups": here the assignment of a subscription to a group
// is terminated when there are no outstanding delivered but unacknowledged messages. In contrast, with a
// standard message grouping queue the assignment will be retained until the subscription is no longer
// registered
if (sharedGroups)
{
cs2.commit();
cs2Received = consumer2.receive(getReceiveTimeout());
assertNotNull("Consumer 2 should have received its second message", cs2Received);
assertEquals("incorrect message received", 4, cs2Received.getIntProperty("msg"));
cs2.commit();
}
else
{
cs2.commit();
cs2Received = consumer2.receive(getReceiveTimeout());
assertNull("Consumer 2 should not have received a second message", cs2Received);
cs1.commit();
cs1Received = consumer1.receive(getReceiveTimeout());
assertNotNull("Consumer 1 should have received its third message", cs1Received);
assertEquals("incorrect message received", 4, cs1Received.getIntProperty("msg"));
}
}
finally
{
consumerConnection.close();
}
}
/**
* Tests that when a number of new messages for a given groupid are arriving while the delivery group
* state is also in the process of being emptied (due to acking a message while using prefetch=1), that only
* 1 of a number of existing consumers is ever receiving messages for the shared group at a time.
*/
@Test
public void singleSharedGroupWithMultipleConsumers() throws Exception
{
String queueName = getTestName();
final boolean useDefaultGroup = false;
Destination queue = createQueue(queueName, true, useDefaultGroup);
final Connection consumerConnection = getConnectionBuilder().setPrefetch(1).build();
try
{
int numMessages = 100;
SharedGroupTestMessageListener groupingTestMessageListener =
new SharedGroupTestMessageListener(numMessages);
Session cs1 = consumerConnection.createSession(false, AUTO_ACKNOWLEDGE);
Session cs2 = consumerConnection.createSession(false, AUTO_ACKNOWLEDGE);
Session cs3 = consumerConnection.createSession(false, AUTO_ACKNOWLEDGE);
Session cs4 = consumerConnection.createSession(false, AUTO_ACKNOWLEDGE);
MessageConsumer consumer1 = cs1.createConsumer(queue);
consumer1.setMessageListener(groupingTestMessageListener);
MessageConsumer consumer2 = cs2.createConsumer(queue);
consumer2.setMessageListener(groupingTestMessageListener);
MessageConsumer consumer3 = cs3.createConsumer(queue);
consumer3.setMessageListener(groupingTestMessageListener);
MessageConsumer consumer4 = cs4.createConsumer(queue);
consumer4.setMessageListener(groupingTestMessageListener);
consumerConnection.start();
final Connection producerConnection = getConnection();
try
{
final Session producerSession = producerConnection.createSession(true, SESSION_TRANSACTED);
final MessageProducer producer = producerSession.createProducer(queue);
for (int i = 1; i <= numMessages; i++)
{
producer.send(createMessage(producerSession, i, "GROUP", useDefaultGroup));
}
producerSession.commit();
}
finally
{
producerConnection.close();
}
assertTrue("Mesages not all received in the allowed timeframe",
groupingTestMessageListener.waitForLatch(30));
assertEquals("Unexpected concurrent processing of messages for the group",
0,
groupingTestMessageListener.getConcurrentProcessingCases());
assertNull("Unexpected throwable in message listeners", groupingTestMessageListener.getThrowable());
}
finally
{
consumerConnection.close();
}
}
private Destination createQueue(final String queueName, final boolean sharedGroups,
final boolean useDefaultKey) throws Exception
{
final Map<String, Object> arguments = new HashMap<>();
if(!useDefaultKey)
{
arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_KEY_OVERRIDE, "group");
}
arguments.put(ConfiguredObject.DURABLE, "false");
arguments.put(org.apache.qpid.server.model.Queue.MESSAGE_GROUP_TYPE, sharedGroups ? MessageGroupType.SHARED_GROUPS.name() : MessageGroupType.STANDARD.name());
createEntityUsingAmqpManagement(queueName, "org.apache.qpid.Queue", arguments);
return createQueue(queueName);
}
private Message createMessage(final Session session, int msg, String group, final boolean useDefaultGroup) throws JMSException
{
Message send = session.createTextMessage("Message: " + msg);
send.setIntProperty("msg", msg);
send.setStringProperty(getGroupKey(useDefaultGroup), group);
return send;
}
private String getGroupKey(final boolean useDefaultGroup)
{
return useDefaultGroup ? "JMSXGroupID" : "group";
}
public static class SharedGroupTestMessageListener implements MessageListener
{
private final CountDownLatch _count;
private final AtomicInteger _activeListeners = new AtomicInteger();
private final AtomicInteger _concurrentProcessingCases = new AtomicInteger();
private Throwable _throwable;
SharedGroupTestMessageListener(int numMessages)
{
_count = new CountDownLatch(numMessages);
}
@Override
public void onMessage(Message message)
{
try
{
int currentActiveListeners = _activeListeners.incrementAndGet();
if (currentActiveListeners > 1)
{
_concurrentProcessingCases.incrementAndGet();
LOGGER.error("Concurrent processing when handling message: " + message.getIntProperty("msg"));
}
try
{
Thread.sleep(25);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
_activeListeners.decrementAndGet();
}
catch (Throwable t)
{
LOGGER.error("Unexpected throwable received by listener", t);
_throwable = t;
}
finally
{
_count.countDown();
}
}
boolean waitForLatch(int seconds) throws Exception
{
return _count.await(seconds, TimeUnit.SECONDS);
}
int getConcurrentProcessingCases()
{
return _concurrentProcessingCases.get();
}
Throwable getThrowable()
{
return _throwable;
}
}
}