blob: e43c8133ac3a553c19410183fd96f7aa49a41bc8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import com.sun.management.UnixOperatingSystemMXBean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.MessageHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
public class DeadLetterAddressTest extends ActiveMQTestBase {
private ActiveMQServer server;
private ClientSession clientSession;
private ServerLocator locator;
@Test
public void testBasicSend() throws Exception {
SimpleString dla = new SimpleString("DLA");
SimpleString qName = new SimpleString("q1");
SimpleString adName = new SimpleString("ad1");
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
SimpleString dlq = new SimpleString("DLQ1");
clientSession.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(false));
clientSession.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(false));
ClientProducer producer = clientSession.createProducer(adName);
producer.send(createTextMessage(clientSession, "heyho!"));
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
ClientMessage m = clientConsumer.receive(500);
m.acknowledge();
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
// force a cancel
clientSession.rollback();
m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
clientConsumer.close();
clientConsumer = clientSession.createConsumer(dlq);
m = clientConsumer.receive(500);
Assert.assertNotNull(m);
assertEquals("q1", m.getStringProperty(Message.HDR_ORIGINAL_QUEUE));
assertEquals("ad1", m.getStringProperty(Message.HDR_ORIGINAL_ADDRESS));
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
}
@Test
public void testLargeMessageFileLeak() throws Exception {
OperatingSystemMXBean os = ManagementFactory.getOperatingSystemMXBean();
// only run this on *nix systems which will have the com.sun.management.UnixOperatingSystemMXBean (needed to check open file count)
Assume.assumeTrue(os instanceof UnixOperatingSystemMXBean);
long fdBaseline = ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount();
final int SIZE = 2 * 1024;
SimpleString dla = new SimpleString("DLA");
SimpleString qName = new SimpleString("q1");
SimpleString adName = new SimpleString("ad1");
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
SimpleString dlq = new SimpleString("DLQ1");
clientSession.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(false));
clientSession.createQueue(new QueueConfiguration(qName).setAddress(adName).setDurable(false));
for (int i = 0; i < 10; i++) {
ClientProducer producer = clientSession.createProducer(adName);
ClientMessage clientFile = clientSession.createMessage(true);
clientFile.setBodyInputStream(ActiveMQTestBase.createFakeLargeStream(SIZE));
producer.send(clientFile);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
ClientMessage m = clientConsumer.receive(500);
m.acknowledge();
Assert.assertNotNull(m);
// force a cancel
clientSession.rollback();
m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
clientConsumer.close();
}
Wait.assertTrue("File descriptors are leaking", () -> ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount() - fdBaseline <= 0);
}
// HORNETQ- 1084
@Test
public void testBasicSendWithDLAButNoBinding() throws Exception {
SimpleString dla = new SimpleString("DLA");
SimpleString qName = new SimpleString("q1");
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
//SimpleString dlq = new SimpleString("DLQ1");
//clientSession.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(false));
clientSession.createQueue(new QueueConfiguration(qName).setDurable(false));
ClientProducer producer = clientSession.createProducer(qName);
producer.send(createTextMessage(clientSession, "heyho!"));
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
ClientMessage m = clientConsumer.receive(500);
m.acknowledge();
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
// force a cancel
clientSession.rollback();
m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
clientConsumer.close();
Queue q = (Queue) server.getPostOffice().getBinding(qName).getBindable();
Assert.assertEquals(0, q.getDeliveringCount());
}
@Test
public void testBasicSend2times() throws Exception {
SimpleString dla = new SimpleString("DLA");
SimpleString qName = new SimpleString("q1");
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(2).setDeadLetterAddress(dla);
server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
SimpleString dlq = new SimpleString("DLQ1");
clientSession.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(false));
clientSession.createQueue(new QueueConfiguration(qName).setDurable(false));
ClientProducer producer = clientSession.createProducer(qName);
producer.send(createTextMessage(clientSession, "heyho!"));
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
ClientMessage m = clientConsumer.receive(5000);
m.acknowledge();
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
// force a cancel
clientSession.rollback();
clientSession.start();
m = clientConsumer.receive(5000);
m.acknowledge();
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
// force a cancel
clientSession.rollback();
m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
clientConsumer.close();
clientConsumer = clientSession.createConsumer(dlq);
m = clientConsumer.receive(5000);
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
}
@Test
public void testReceiveWithListeners() throws Exception {
SimpleString dla = new SimpleString("DLA");
SimpleString qName = new SimpleString("q1");
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(2).setDeadLetterAddress(dla);
server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
SimpleString dlq = new SimpleString("DLQ1");
clientSession.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(false));
clientSession.createQueue(new QueueConfiguration(qName).setDurable(false));
ClientProducer producer = clientSession.createProducer(qName);
producer.send(createTextMessage(clientSession, "heyho!"));
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
final CountDownLatch latch = new CountDownLatch(2);
TestHandler handler = new TestHandler(latch, clientSession);
clientConsumer.setMessageHandler(handler);
clientSession.start();
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(handler.count, 2);
clientConsumer = clientSession.createConsumer(dlq);
Message m = clientConsumer.receive(5000);
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
}
class TestHandler implements MessageHandler {
private final CountDownLatch latch;
int count = 0;
private final ClientSession clientSession;
TestHandler(CountDownLatch latch, ClientSession clientSession) {
this.latch = latch;
this.clientSession = clientSession;
}
@Override
public void onMessage(ClientMessage message) {
count++;
latch.countDown();
try {
clientSession.rollback(true);
} catch (ActiveMQException e) {
e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
}
throw new RuntimeException();
}
}
@Test
public void testBasicSendToMultipleQueues() throws Exception {
SimpleString dla = new SimpleString("DLA");
SimpleString qName = new SimpleString("q1");
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
SimpleString dlq = new SimpleString("DLQ1");
SimpleString dlq2 = new SimpleString("DLQ2");
clientSession.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(false));
clientSession.createQueue(new QueueConfiguration(dlq2).setAddress(dla).setDurable(false));
clientSession.createQueue(new QueueConfiguration(qName).setDurable(false));
ClientProducer producer = clientSession.createProducer(qName);
producer.send(createTextMessage(clientSession, "heyho!"));
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
ClientMessage m = clientConsumer.receive(500);
m.acknowledge();
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
// force a cancel
clientSession.rollback();
m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
clientConsumer.close();
clientConsumer = clientSession.createConsumer(dlq);
m = clientConsumer.receive(500);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
clientConsumer.close();
clientConsumer = clientSession.createConsumer(dlq2);
m = clientConsumer.receive(500);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
clientConsumer.close();
}
@Test
public void testBasicSendToNoQueue() throws Exception {
SimpleString qName = new SimpleString("q1");
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1);
server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
clientSession.createQueue(new QueueConfiguration(qName).setDurable(false));
ClientProducer producer = clientSession.createProducer(qName);
producer.send(createTextMessage(clientSession, "heyho!"));
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
ClientMessage m = clientConsumer.receive(500);
m.acknowledge();
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
// force a cancel
clientSession.rollback();
m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
clientConsumer.close();
}
@Test
public void testHeadersSet() throws Exception {
final int MAX_DELIVERIES = 16;
final int NUM_MESSAGES = 5;
SimpleString dla = new SimpleString("DLA");
SimpleString qName = new SimpleString("q1");
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(MAX_DELIVERIES).setDeadLetterAddress(dla);
server.getAddressSettingsRepository().addMatch(qName.toString(), addressSettings);
SimpleString dlq = new SimpleString("DLQ1");
clientSession.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(false));
clientSession.createQueue(new QueueConfiguration(qName).setDurable(false));
ServerLocator locator = createInVMNonHALocator();
ClientSessionFactory sessionFactory = createSessionFactory(locator);
ClientSession sendSession = sessionFactory.createSession(false, true, true);
ClientProducer producer = sendSession.createProducer(qName);
Map<String, Long> origIds = new HashMap<>();
for (int i = 0; i < NUM_MESSAGES; i++) {
ClientMessage tm = createTextMessage(clientSession, "Message:" + i);
producer.send(tm);
}
ClientConsumer clientConsumer = clientSession.createConsumer(qName);
clientSession.start();
for (int i = 0; i < MAX_DELIVERIES; i++) {
for (int j = 0; j < NUM_MESSAGES; j++) {
ClientMessage tm = clientConsumer.receive(1000);
Assert.assertNotNull(tm);
tm.acknowledge();
if (i == 0) {
origIds.put("Message:" + j, tm.getMessageID());
}
Assert.assertEquals("Message:" + j, tm.getBodyBuffer().readString());
}
clientSession.rollback();
}
long timeout = System.currentTimeMillis() + 5000;
// DLA transfer is asynchronous fired on the rollback
while (System.currentTimeMillis() < timeout && getMessageCount(((Queue) server.getPostOffice().getBinding(qName).getBindable())) != 0) {
Thread.sleep(1);
}
Assert.assertEquals(0, getMessageCount(((Queue) server.getPostOffice().getBinding(qName).getBindable())));
ClientMessage m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
// All the messages should now be in the DLQ
ClientConsumer cc3 = clientSession.createConsumer(dlq);
for (int i = 0; i < NUM_MESSAGES; i++) {
ClientMessage tm = cc3.receive(1000);
Assert.assertNotNull(tm);
String text = tm.getBodyBuffer().readString();
Assert.assertEquals("Message:" + i, text);
// Check the headers
SimpleString origDest = (SimpleString) tm.getObjectProperty(Message.HDR_ORIGINAL_ADDRESS);
Long origMessageId = (Long) tm.getObjectProperty(Message.HDR_ORIG_MESSAGE_ID);
Assert.assertEquals(qName, origDest);
Long origId = origIds.get(text);
Assert.assertEquals(origId, origMessageId);
}
sendSession.close();
}
@Test
public void testDeadlLetterAddressWithDefaultAddressSettings() throws Exception {
int deliveryAttempt = 3;
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString deadLetterAddress = RandomUtil.randomSimpleString();
SimpleString deadLetterQueue = RandomUtil.randomSimpleString();
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(deliveryAttempt).setDeadLetterAddress(deadLetterAddress);
server.getAddressSettingsRepository().setDefault(addressSettings);
clientSession.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false));
clientSession.createQueue(new QueueConfiguration(deadLetterQueue).setAddress(deadLetterAddress).setDurable(false));
ClientProducer producer = clientSession.createProducer(address);
ClientMessage clientMessage = createTextMessage(clientSession, "heyho!");
producer.send(clientMessage);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(queue);
for (int i = 0; i < deliveryAttempt; i++) {
ClientMessage m = clientConsumer.receive(500);
Assert.assertNotNull(m);
Assert.assertEquals(i + 1, m.getDeliveryCount());
m.acknowledge();
clientSession.rollback();
}
ClientMessage m = clientConsumer.receive(500);
Assert.assertNull("not expecting a message", m);
clientConsumer.close();
clientConsumer = clientSession.createConsumer(deadLetterQueue);
m = clientConsumer.receive(500);
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
}
@Test
public void testDeadlLetterAddressWithWildcardAddressSettings() throws Exception {
int deliveryAttempt = 3;
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString deadLetterAddress = RandomUtil.randomSimpleString();
SimpleString deadLetterQueue = RandomUtil.randomSimpleString();
AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(deliveryAttempt).setDeadLetterAddress(deadLetterAddress);
server.getAddressSettingsRepository().addMatch("*", addressSettings);
clientSession.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false));
clientSession.createQueue(new QueueConfiguration(deadLetterQueue).setAddress(deadLetterAddress).setDurable(false));
ClientProducer producer = clientSession.createProducer(address);
ClientMessage clientMessage = createTextMessage(clientSession, "heyho!");
producer.send(clientMessage);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(queue);
for (int i = 0; i < deliveryAttempt; i++) {
ClientMessage m = clientConsumer.receive(500);
Assert.assertNotNull(m);
Assert.assertEquals(i + 1, m.getDeliveryCount());
m.acknowledge();
clientSession.rollback();
}
ClientMessage m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
clientConsumer.close();
clientConsumer = clientSession.createConsumer(deadLetterQueue);
m = clientConsumer.receive(500);
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "heyho!");
}
@Test
public void testDeadLetterAddressWithOverridenSublevelAddressSettings() throws Exception {
int defaultDeliveryAttempt = 3;
int specificeDeliveryAttempt = defaultDeliveryAttempt + 1;
SimpleString address = new SimpleString("prefix.address");
SimpleString queue = RandomUtil.randomSimpleString();
SimpleString defaultDeadLetterAddress = RandomUtil.randomSimpleString();
SimpleString defaultDeadLetterQueue = RandomUtil.randomSimpleString();
SimpleString specificDeadLetterAddress = RandomUtil.randomSimpleString();
SimpleString specificDeadLetterQueue = RandomUtil.randomSimpleString();
AddressSettings defaultAddressSettings = new AddressSettings().setMaxDeliveryAttempts(defaultDeliveryAttempt).setDeadLetterAddress(defaultDeadLetterAddress);
server.getAddressSettingsRepository().addMatch("*", defaultAddressSettings);
AddressSettings specificAddressSettings = new AddressSettings().setMaxDeliveryAttempts(specificeDeliveryAttempt).setDeadLetterAddress(specificDeadLetterAddress);
server.getAddressSettingsRepository().addMatch(address.toString(), specificAddressSettings);
clientSession.createQueue(new QueueConfiguration(queue).setAddress(address).setDurable(false));
clientSession.createQueue(new QueueConfiguration(defaultDeadLetterQueue).setAddress(defaultDeadLetterAddress).setDurable(false));
clientSession.createQueue(new QueueConfiguration(specificDeadLetterQueue).setAddress(specificDeadLetterAddress).setDurable(false));
ClientProducer producer = clientSession.createProducer(address);
ClientMessage clientMessage = createTextMessage(clientSession, "heyho!");
producer.send(clientMessage);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(queue);
ClientConsumer defaultDeadLetterConsumer = clientSession.createConsumer(defaultDeadLetterQueue);
ClientConsumer specificDeadLetterConsumer = clientSession.createConsumer(specificDeadLetterQueue);
for (int i = 0; i < defaultDeliveryAttempt; i++) {
ClientMessage m = clientConsumer.receive(500);
Assert.assertNotNull(m);
Assert.assertEquals(i + 1, m.getDeliveryCount());
m.acknowledge();
clientSession.rollback();
}
Assert.assertNull(defaultDeadLetterConsumer.receiveImmediate());
Assert.assertNull(specificDeadLetterConsumer.receiveImmediate());
// one more redelivery attempt:
ClientMessage m = clientConsumer.receive(500);
Assert.assertNotNull(m);
Assert.assertEquals(specificeDeliveryAttempt, m.getDeliveryCount());
m.acknowledge();
clientSession.rollback();
Assert.assertNull(defaultDeadLetterConsumer.receiveImmediate());
Assert.assertNotNull(specificDeadLetterConsumer.receive(500));
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = addServer(ActiveMQServers.newActiveMQServer(createDefaultInVMConfig(), true));
server.start();
// then we create a client as normal
locator = createInVMNonHALocator();
locator.setMinLargeMessageSize(1024);
ClientSessionFactory sessionFactory = createSessionFactory(locator);
clientSession = addClientSession(sessionFactory.createSession(false, true, false));
}
}