blob: fd90354157a27337ecd44a02e7c8691f4c291f4c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.systests.jms_1_1.extensions.queue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.NamingException;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.queue.LastValueQueue;
import org.apache.qpid.systests.JmsTestBase;
public class LastValueQueueTest extends JmsTestBase
{
private static final Logger LOGGER = LoggerFactory.getLogger(LastValueQueueTest.class);
private static final String MESSAGE_SEQUENCE_NUMBER_PROPERTY = "msg";
private static final String KEY_PROPERTY = "key";
private static final int MSG_COUNT = 400;
private static final int NUMBER_OF_UNIQUE_KEY_VALUES = 10;
@Test
public void testConflation() throws Exception
{
final String queueName = getTestName();
final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false);
final Connection producerConnection = getConnection();
try
{
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(queue);
Message message = producerSession.createMessage();
message.setStringProperty(KEY_PROPERTY, "A");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 1);
producer.send(message);
message.setStringProperty(KEY_PROPERTY, "B");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 2);
producer.send(message);
message.setStringProperty(KEY_PROPERTY, "A");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 3);
producer.send(message);
message.setStringProperty(KEY_PROPERTY, "B");
message.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, 4);
producer.send(message);
}
finally
{
producerConnection.close();
}
Connection consumerConnection = getConnection();
try
{
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message received1 = consumer.receive(getReceiveTimeout());
assertNotNull("First message is not received", received1);
assertEquals("Unexpected key property value", "A", received1.getStringProperty(KEY_PROPERTY));
assertEquals("Unexpected sequence property value",
3,
received1.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
Message received2 = consumer.receive(getReceiveTimeout());
assertNotNull("Second message is not received", received2);
assertEquals("Unexpected key property value", "B", received2.getStringProperty(KEY_PROPERTY));
assertEquals("Unexpected sequence property value",
4,
received2.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
assertNull("Unexpected message is received", consumer.receive(getReceiveTimeout() / 4));
}
finally
{
consumerConnection.close();
}
}
@Test
public void testConflationWithRelease() throws Exception
{
final String queueName = getTestName();
final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false);
sendMessages(queue, 0, MSG_COUNT / 2);
Connection consumerConnection = getConnection();
try
{
final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
{
final Message received = consumer.receive(getReceiveTimeout());
assertNotNull(String.format("Message with key %d is not received", i), received);
assertEquals("Unexpected message received",
MSG_COUNT / 2 - NUMBER_OF_UNIQUE_KEY_VALUES + i,
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
}
finally
{
consumerConnection.close();
}
sendMessages(queue, MSG_COUNT / 2, MSG_COUNT);
consumerConnection = getConnection();
try
{
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
{
final Message received = consumer.receive(getReceiveTimeout());
assertNotNull(String.format("Message with key %d is not received", i), received);
assertEquals("Unexpected message received",
MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i,
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
}
finally
{
consumerConnection.close();
}
}
@Test
public void testConflationWithReleaseAfterNewPublish() throws Exception
{
final String queueName = getTestName();
final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false);
sendMessages(queue, 0, MSG_COUNT / 2);
Connection consumerConnection = getConnection();
try
{
final Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
{
final Message received = consumer.receive(getReceiveTimeout());
assertNotNull(String.format("Message with key %d is not received", i), received);
assertEquals("Unexpected message received",
MSG_COUNT / 2 - NUMBER_OF_UNIQUE_KEY_VALUES + i,
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
sendMessages(queue, MSG_COUNT / 2, MSG_COUNT);
consumer.close();
consumerSession.close();
}
finally
{
consumerConnection.close();
}
consumerConnection = getConnection();
try
{
consumerConnection.start();
final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
{
final Message received = consumer.receive(getReceiveTimeout());
assertNotNull(String.format("Message with key %d is not received", i), received);
assertEquals("Unexpected message received",
MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i,
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
}
finally
{
consumerConnection.close();
}
}
@Test
public void testConflatedQueueDepth() throws Exception
{
final String queueName = getTestName();
final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false);
sendMessages(queue, 0, MSG_COUNT);
assertEquals(NUMBER_OF_UNIQUE_KEY_VALUES, getTotalDepthOfQueuesMessages());
}
@Test
public void testConflationBrowser() throws Exception
{
final String queueName = getTestName();
final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, true);
sendMessages(queue, 0, MSG_COUNT);
final Connection consumerConnection = getConnection();
try
{
final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
{
final Message received = consumer.receive(getReceiveTimeout());
assertNotNull(String.format("Message with key %d is not received", i), received);
assertEquals("Unexpected message received",
MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i,
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
sendMessages(queue, MSG_COUNT, MSG_COUNT + 1);
final Message received = consumer.receive(getReceiveTimeout());
assertNotNull(String.format("Message with key %d is not received", 0), received);
assertEquals("Unexpected message received",
MSG_COUNT,
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
finally
{
consumerConnection.close();
}
}
@Test
public void testConflation2Browsers() throws Exception
{
final String queueName = getTestName();
final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, true);
sendMessages(queue, 0, MSG_COUNT);
final Connection consumerConnection = getConnection();
try
{
final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
final MessageConsumer consumer2 = consumerSession.createConsumer(queue);
consumerConnection.start();
for (int i = 0; i < NUMBER_OF_UNIQUE_KEY_VALUES; i++)
{
final Message received = consumer.receive(getReceiveTimeout());
assertNotNull(String.format("Message with key %d is not received by first consumer", i), received);
assertEquals("Unexpected message received by first consumer",
MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i,
received.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
final Message received2 = consumer2.receive(getReceiveTimeout());
assertNotNull(String.format("Message with key %d is not received by second consumer", i), received2);
assertEquals("Unexpected message received by second consumer",
MSG_COUNT - NUMBER_OF_UNIQUE_KEY_VALUES + i,
received2.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY));
}
}
finally
{
consumerConnection.close();
}
}
@Test
public void testParallelProductionAndConsumption() throws Exception
{
final String queueName = getTestName();
final Queue queue = createConflationQueue(queueName, KEY_PROPERTY, false);
int numberOfUniqueKeyValues = 2;
final ExecutorService executorService = Executors.newFixedThreadPool(2);
try
{
// Start producing threads that send messages
final BackgroundMessageProducer messageProducer1 = new BackgroundMessageProducer(queue,
numberOfUniqueKeyValues);
final BackgroundMessageProducer messageProducer2 = new BackgroundMessageProducer(queue,
numberOfUniqueKeyValues);
final Future<?> future1 = executorService.submit(messageProducer1);
final Future<?> future2 = executorService.submit(messageProducer2);
final Map<String, Integer> lastReceivedMessages = receiveMessages(messageProducer1, queue);
future1.get(getReceiveTimeout() * MSG_COUNT, TimeUnit.MILLISECONDS);
future2.get(getReceiveTimeout() * MSG_COUNT, TimeUnit.MILLISECONDS);
final Map<String, Integer> lastSentMessages1 = messageProducer1.getMessageSequenceNumbersByKey();
assertEquals("Unexpected number of last sent messages sent by producer1",
numberOfUniqueKeyValues, lastSentMessages1.size());
final Map<String, Integer> lastSentMessages2 = messageProducer2.getMessageSequenceNumbersByKey();
assertEquals(lastSentMessages1, lastSentMessages2);
assertEquals("The last message sent for each key should match the last message received for that key",
lastSentMessages1, lastReceivedMessages);
assertNull("Unexpected exception from background producer thread", messageProducer1.getException());
}
finally
{
executorService.shutdown();
}
}
private Map<String, Integer> receiveMessages(BackgroundMessageProducer producer, final Queue queue) throws Exception
{
producer.waitUntilQuarterOfMessagesSentToEncourageConflation();
Map<String, Integer> messageSequenceNumbersByKey = new HashMap<>();
Connection consumerConnection = getConnectionBuilder().setPrefetch(1).build();
try
{
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
LOGGER.info("Starting to receive");
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
Message message;
int numberOfShutdownsReceived = 0;
int numberOfMessagesReceived = 0;
while (numberOfShutdownsReceived < 2)
{
message = consumer.receive(getReceiveTimeout());
assertNotNull("null received after "
+ numberOfMessagesReceived
+ " messages and "
+ numberOfShutdownsReceived
+ " shutdowns", message);
if (message.propertyExists(BackgroundMessageProducer.SHUTDOWN))
{
numberOfShutdownsReceived++;
}
else
{
numberOfMessagesReceived++;
putMessageInMap(message, messageSequenceNumbersByKey);
}
}
LOGGER.info("Finished receiving. Received " + numberOfMessagesReceived + " message(s) in total");
}
finally
{
consumerConnection.close();
}
return messageSequenceNumbersByKey;
}
private void putMessageInMap(Message message, Map<String, Integer> messageSequenceNumbersByKey) throws JMSException
{
String keyValue = message.getStringProperty(KEY_PROPERTY);
Integer messageSequenceNumber = message.getIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY);
messageSequenceNumbersByKey.put(keyValue, messageSequenceNumber);
}
private final class BackgroundMessageProducer implements Runnable
{
static final String SHUTDOWN = "SHUTDOWN";
private final Queue _queue;
private volatile Exception _exception;
private Map<String, Integer> _messageSequenceNumbersByKey = new HashMap<>();
private CountDownLatch _quarterOfMessagesSentLatch = new CountDownLatch(MSG_COUNT / 4);
private int _numberOfUniqueKeyValues;
BackgroundMessageProducer(Queue queue, final int numberOfUniqueKeyValues)
{
_queue = queue;
_numberOfUniqueKeyValues = numberOfUniqueKeyValues;
}
void waitUntilQuarterOfMessagesSentToEncourageConflation() throws InterruptedException
{
final long latchTimeout = 60000;
boolean success = _quarterOfMessagesSentLatch.await(latchTimeout, TimeUnit.MILLISECONDS);
assertTrue("Failed to be notified that 1/4 of the messages have been sent within " + latchTimeout + " ms.",
success);
LOGGER.info("Quarter of messages sent");
}
public Exception getException()
{
return _exception;
}
Map<String, Integer> getMessageSequenceNumbersByKey()
{
return Collections.unmodifiableMap(_messageSequenceNumbersByKey);
}
@Override
public void run()
{
try
{
LOGGER.info("Starting to send in background thread");
final Connection producerConnection = getConnection();
try
{
final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
final MessageProducer backgroundProducer = producerSession.createProducer(_queue);
for (int messageNumber = 0; messageNumber < MSG_COUNT; messageNumber++)
{
final Message message = nextMessage(messageNumber, producerSession, _numberOfUniqueKeyValues);
backgroundProducer.send(message);
producerSession.commit();
putMessageInMap(message, _messageSequenceNumbersByKey);
_quarterOfMessagesSentLatch.countDown();
}
Message shutdownMessage = producerSession.createMessage();
shutdownMessage.setBooleanProperty(SHUTDOWN, true);
// make sure the shutdown messages have distinct keys because the Qpid Cpp Broker will
// otherwise consider them to have the same key.
shutdownMessage.setStringProperty(KEY_PROPERTY, Thread.currentThread().getName());
backgroundProducer.send(shutdownMessage);
producerSession.commit();
}
finally
{
producerConnection.close();
}
LOGGER.info("Finished sending in background thread");
}
catch (Exception e)
{
_exception = e;
LOGGER.warn("Unexpected exception in publisher", e);
}
}
}
private Queue createConflationQueue(final String queueName,
final String keyProperty, final boolean enforceBrowseOnly) throws Exception
{
final Map<String, Object> arguments = new HashMap<>();
arguments.put(LastValueQueue.LVQ_KEY, keyProperty);
if (enforceBrowseOnly)
{
arguments.put("ensureNondestructiveConsumers", true);
}
createEntityUsingAmqpManagement(queueName, "org.apache.qpid.LastValueQueue", arguments);
return createQueue(queueName);
}
private Message nextMessage(int msg, Session producerSession) throws JMSException
{
return nextMessage(msg, producerSession, NUMBER_OF_UNIQUE_KEY_VALUES);
}
private Message nextMessage(int msg, Session producerSession, int numberOfUniqueKeyValues) throws JMSException
{
Message send = producerSession.createTextMessage("Message: " + msg);
final String keyValue = String.valueOf(msg % numberOfUniqueKeyValues);
send.setStringProperty(KEY_PROPERTY, keyValue);
send.setIntProperty(MESSAGE_SEQUENCE_NUMBER_PROPERTY, msg);
return send;
}
private void sendMessages(final Queue queue, final int fromIndex, final int toIndex)
throws JMSException, NamingException
{
Connection producerConnection = getConnection();
try
{
Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = producerSession.createProducer(queue);
for (int msg = fromIndex; msg < toIndex; msg++)
{
producer.send(nextMessage(msg, producerSession));
producerSession.commit();
}
producer.close();
producerSession.close();
}
finally
{
producerConnection.close();
}
}
}