blob: dbb23c702376fc1b9c8ddb34190f0c82591818c2 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.systests.jms_1_1.extensions.maxdelivery;
import static org.apache.qpid.systests.Utils.INDEX;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import org.apache.qpid.server.model.AlternateBinding;
import org.apache.qpid.systests.JmsTestBase;
import org.apache.qpid.systests.Utils;
public class MaxDeliveryTest extends JmsTestBase
{
private static final int MAX_DELIVERY_ATTEMPTS = 2;
private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
@Test
public void maximumDelivery() throws Exception
{
String queueName = getTestName();
String dlqNAme = getTestName() + "_DLQ";
createMaxDeliveryQueueAndDLQ(queueName, MAX_DELIVERY_ATTEMPTS, dlqNAme);
int numberOfMessages = 5;
Connection connection = getConnectionBuilder().setMessageRedelivery(true).build();
try
{
final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
final Queue queue = session.createQueue(queueName);
final MessageConsumer consumer = session.createConsumer(queue);
Utils.sendMessages(connection, queue, numberOfMessages);
connection.start();
int expectedMessageIndex = 0;
int deliveryAttempt = 0;
int deliveryCounter = 0;
do
{
Message message = consumer.receive(getReceiveTimeout());
assertNotNull(message, String.format("Message '%d' was not received in delivery attempt %d",
expectedMessageIndex, deliveryAttempt));
int index = message.getIntProperty(INDEX);
assertEquals(expectedMessageIndex, index,
String.format("Unexpected message index (delivery attempt %d)", deliveryAttempt));
deliveryCounter++;
// dlq all even messages
if (index % 2 == 0)
{
session.rollback();
if (deliveryAttempt < MAX_DELIVERY_ATTEMPTS - 1)
{
deliveryAttempt++;
}
else
{
deliveryAttempt = 0;
expectedMessageIndex++;
}
}
else
{
session.commit();
deliveryAttempt = 0;
expectedMessageIndex++;
}
}
while (expectedMessageIndex != numberOfMessages);
int numberOfEvenMessages = numberOfMessages / 2 + 1;
assertEquals(numberOfEvenMessages * MAX_DELIVERY_ATTEMPTS + (numberOfMessages - numberOfEvenMessages),
deliveryCounter,
"Unexpected total delivery counter");
verifyDeadLetterQueueMessages(connection, dlqNAme, numberOfEvenMessages);
}
finally
{
connection.close();
}
}
@Test
public void maximumDeliveryWithinMessageListener() throws Exception
{
String queueName = getTestName();
String dlqName = getTestName() + "_DLQ";
createMaxDeliveryQueueAndDLQ(queueName, MAX_DELIVERY_ATTEMPTS, dlqName);
int numberOfMessages = 5;
Connection connection = getConnectionBuilder().setMessageRedelivery(true).build();
try
{
final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
final Queue queue = session.createQueue(queueName);
final MessageConsumer consumer = session.createConsumer(queue);
Utils.sendMessages(connection, queue, numberOfMessages);
connection.start();
int numberOfEvenMessages = numberOfMessages / 2 + 1;
int expectedNumberOfDeliveries =
numberOfEvenMessages * MAX_DELIVERY_ATTEMPTS + (numberOfMessages - numberOfEvenMessages);
final CountDownLatch deliveryLatch = new CountDownLatch(expectedNumberOfDeliveries);
final AtomicReference<Throwable> messageListenerThrowable = new AtomicReference<>();
final AtomicInteger deliveryAttempt = new AtomicInteger();
final AtomicInteger expectedMessageIndex = new AtomicInteger();
consumer.setMessageListener(message -> {
try
{
int index = message.getIntProperty(INDEX);
assertEquals(expectedMessageIndex.get(), index,
String.format("Unexpected message index (delivery attempt %d)", deliveryAttempt.get()));
// dlq all even messages
if (index % 2 == 0)
{
session.rollback();
if (deliveryAttempt.get() < MAX_DELIVERY_ATTEMPTS - 1)
{
deliveryAttempt.incrementAndGet();
}
else
{
deliveryAttempt.set(0);
expectedMessageIndex.incrementAndGet();
}
}
else
{
session.commit();
deliveryAttempt.set(0);
expectedMessageIndex.incrementAndGet();
}
}
catch (Throwable t)
{
messageListenerThrowable.set(t);
}
finally
{
deliveryLatch.countDown();
}
});
assertTrue(deliveryLatch.await(expectedNumberOfDeliveries * getReceiveTimeout(), TimeUnit.MILLISECONDS),
"Messages were not received in timely manner");
assertNull(messageListenerThrowable.get(), "Unexpected throwable in MessageListener");
verifyDeadLetterQueueMessages(connection, dlqName, numberOfEvenMessages);
}
finally
{
connection.close();
}
}
@Test
public void browsingDoesNotIncrementDeliveryCount() throws Exception
{
assumeTrue(getBrokerAdmin().isManagementSupported());
final String queueName = getTestName();
getBrokerAdmin().createQueue(queueName);
Connection connection = getConnection();
try
{
connection.start();
final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
final Queue queue = session.createQueue(queueName);
Utils.sendMessages(connection, queue, 1);
final Map<String, Object> messageInfoBefore = getMessageInfo(queueName, 0);
assertThat("Unexpected delivery count before browse", messageInfoBefore.get("deliveryCount"), is(equalTo(0)));
browseQueueAndValidationDeliveryHeaders(session, queue);
final Map<String, Object> messageInfoAfter = getMessageInfo(queueName, 0);
assertThat("Unexpected delivery count after first browse", messageInfoAfter.get("deliveryCount"), is(equalTo(0)));
browseQueueAndValidationDeliveryHeaders(session, queue);
}
finally
{
connection.close();
}
}
private void verifyDeadLetterQueueMessages(final Connection connection,
final String dlqName,
final int numberOfEvenMessages) throws Exception
{
assertEquals(numberOfEvenMessages, getTotalDepthOfQueuesMessages(), "Unexpected number of total messages");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(dlqName);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < numberOfEvenMessages; i++)
{
Message message = consumer.receive(getReceiveTimeout());
assertEquals(i * 2, message.getIntProperty(INDEX), "Unexpected DLQ message index");
}
}
private void createMaxDeliveryQueueAndDLQ(String queueName, int maxDeliveryAttempts, String dlqName)
throws Exception
{
createEntityUsingAmqpManagement(dlqName,
"org.apache.qpid.StandardQueue",
Collections.emptyMap());
final Map<String, Object> attributes = new HashMap<>();
attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName);
attributes.put(org.apache.qpid.server.model.Queue.MAXIMUM_DELIVERY_ATTEMPTS, maxDeliveryAttempts);
attributes.put(org.apache.qpid.server.model.Queue.ALTERNATE_BINDING,
new ObjectMapper().writeValueAsString(Collections.singletonMap(AlternateBinding.DESTINATION,
dlqName)));
createEntityUsingAmqpManagement(queueName,
"org.apache.qpid.StandardQueue",
attributes);
}
private Map<String, Object> getMessageInfo(String queueName, final int index) throws Exception
{
@SuppressWarnings("unchecked")
List<Map<String, Object>> messages = (List<Map<String, Object>>) performOperationUsingAmqpManagement(queueName,
"getMessageInfo",
"org.apache.qpid.Queue",
Collections.emptyMap());
assertThat("Too few messsages on the queue", messages.size(), is(greaterThan(index)));
return messages.get(index);
}
private void browseQueueAndValidationDeliveryHeaders(final Session session, final Queue queue) throws Exception
{
final QueueBrowser browser = session.createBrowser(queue);
@SuppressWarnings("unchecked")
final List<Message> messages = (List<Message>) new ArrayList(Collections.list(browser.getEnumeration()));
assertThat("Unexpected number of messages seen by browser", messages.size(), is(equalTo(1)));
final Message browsedMessage = messages.get(0);
assertThat(browsedMessage.getJMSRedelivered(), is(equalTo(false)));
if (browsedMessage.propertyExists(JMSX_DELIVERY_COUNT))
{
assertThat(browsedMessage.getIntProperty(JMSX_DELIVERY_COUNT), is(equalTo(1)));
}
browser.close();
}
}