blob: c868c8a4b304fe1a9440fe832756e692634ad343 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.tests.unit.core.postoffice.impl.FakeQueue;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.Assert;
import org.junit.Test;
public class RedeployTest extends ActiveMQTestBase {
@Test
/*
* This tests that the broker doesnt fall over when it tries to delete any autocreated addresses/queues in a clustered environment
* If the undeploy fails then bridges etc can stop working, we need to make sure if undeploy fails on anything the broker is still live
* */
public void testRedeployAutoCreateAddress() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-test-autocreateaddress.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-test-autocreateaddress-reload.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("autoQueue");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("text"));
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue("autoQueue"));
Assert.assertNotNull("Address wasn't autocreated accordingly", consumer.receive(5000));
}
Assert.assertNotNull(getQueue(embeddedActiveMQ, "autoQueue"));
// this simulates a remote queue or other type being added that wouldnt get deleted, its not valid to have this happen but it can happen when addresses and queues are auto created in a clustered env
embeddedActiveMQ.getActiveMQServer().getPostOffice().addBinding(new RemoteQueueBindingImpl(5L,
new SimpleString("autoQueue"),
new SimpleString("uniqueName"),
new SimpleString("routingName"),
6L,
null,
new FakeQueue(new SimpleString("foo"), 6L),
new SimpleString("bridge"),
1,
MessageLoadBalancingType.OFF));
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = new Runnable() {
@Override
public void run() {
latch.countDown();
}
};
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
try {
latch.await(10, TimeUnit.SECONDS);
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
Assert.assertTrue(tryConsume());
Assert.assertNotNull(getQueue(embeddedActiveMQ, "autoQueue"));
factory = new ActiveMQConnectionFactory();
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("autoQueue");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("text"));
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue("autoQueue"));
Assert.assertNotNull("autoQueue redeployed accordingly", consumer.receive(5000));
}
} finally {
embeddedActiveMQ.stop();
}
}
@Test
public void testRedeploy() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-test-jms.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-test-updated-jms.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = new Runnable() {
@Override
public void run() {
latch.countDown();
}
};
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
try {
latch.await(10, TimeUnit.SECONDS);
Assert.assertEquals("DLQ", embeddedActiveMQ.getActiveMQServer().getAddressSettingsRepository().getMatch("jms").getDeadLetterAddress().toString());
Assert.assertEquals("ExpiryQueue", embeddedActiveMQ.getActiveMQServer().getAddressSettingsRepository().getMatch("jms").getExpiryAddress().toString());
Assert.assertFalse(tryConsume());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
Assert.assertTrue(tryConsume());
Assert.assertEquals("NewQueue", embeddedActiveMQ.getActiveMQServer().getAddressSettingsRepository().getMatch("jms").getDeadLetterAddress().toString());
Assert.assertEquals("NewQueue", embeddedActiveMQ.getActiveMQServer().getAddressSettingsRepository().getMatch("jms").getExpiryAddress().toString());
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("DivertQueue");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("text"));
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue("NewQueue"));
Assert.assertNotNull("Divert wasn't redeployed accordingly", consumer.receive(5000));
}
} finally {
embeddedActiveMQ.stop();
}
}
@Test
public void testRedeployFilter() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-filter.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-filter-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = new Runnable() {
@Override
public void run() {
latch.countDown();
}
};
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
try {
latch.await(10, TimeUnit.SECONDS);
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myFilterQueue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
message.setStringProperty("x", "x");
producer.send(message);
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull(consumer.receive(5000));
consumer.close();
}
//Send a message that should remain in the queue (this ensures config change is non-destructive)
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myFilterQueue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createTextMessage("hello");
message.setStringProperty("x", "x");
producer.send(message);
}
Binding binding = embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString("myFilterQueue"));
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
Binding bindingAfterChange = embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString("myFilterQueue"));
assertTrue("Instance should be the same (as should be non destructive)", binding == bindingAfterChange);
assertEquals(binding.getID(), bindingAfterChange.getID());
//Check that after the config change we can still consume a message that was sent before, ensuring config change was non-destructive of the queue.
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myFilterQueue");
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(5000);
assertNotNull(message);
assertEquals("hello", ((TextMessage)message).getText());
consumer.close();
}
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myFilterQueue");
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
message.setStringProperty("x", "y");
producer.send(message);
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull(consumer.receive(2000));
consumer.close();
}
} finally {
embeddedActiveMQ.stop();
}
}
private void deployBrokerConfig(EmbeddedActiveMQ server, URL configFile) throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
Files.copy(configFile.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = latch::countDown;
server.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
}
private void doTestRemoveFilter(URL testConfiguration) throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-filter.xml");
Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
deployBrokerConfig(embeddedActiveMQ, baseConfig);
try {
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myFilterQueue");
// Test that the original filter has been set up
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
.getBinding(new SimpleString("myFilterQueue"));
// The "x = 'x'" value is found in "reload-queue-filter.xml"
assertEquals("x = 'x'", queueBinding.getFilter().getFilterString().toString());
MessageProducer producer = session.createProducer(queue);
// Test that the original filter affects the flow
Message passingMessage = session.createMessage();
passingMessage.setStringProperty("x", "x");
producer.send(passingMessage);
Message filteredMessage = session.createMessage();
filteredMessage.setStringProperty("x", "y");
producer.send(filteredMessage);
MessageConsumer consumer = session.createConsumer(queue);
Message receivedMessage = consumer.receive(2000);
assertNotNull(receivedMessage);
assertEquals("x", receivedMessage.getStringProperty("x"));
assertNull(consumer.receive(2000));
consumer.close();
}
deployBrokerConfig(embeddedActiveMQ, testConfiguration);
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue("myFilterQueue");
// Test that the filter has been removed
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
.getBinding(new SimpleString("myFilterQueue"));
assertNull(queueBinding.getFilter());
MessageProducer producer = session.createProducer(queue);
// Test that the original filter no longer affects the flow
Message message1 = session.createMessage();
message1.setStringProperty("x", "x");
producer.send(message1);
Message message2 = session.createMessage();
message2.setStringProperty("x", "y");
producer.send(message2);
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull(consumer.receive(2000));
assertNotNull(consumer.receive(2000));
consumer.close();
}
} finally {
embeddedActiveMQ.stop();
}
}
@Test
public void testRedeployRemoveFilter() throws Exception {
doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-updated-empty.xml"));
doTestRemoveFilter(RedeployTest.class.getClassLoader().getResource("reload-queue-filter-removed.xml"));
}
/**
* This one is here just to make sure it's possible to change queue parameters one by one without setting the others
* to <code>null</code>.
* @throws Exception
*/
@Test
public void testQueuePartialReconfiguration() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url = RedeployTest.class.getClassLoader().getResource("reload-empty.xml");
Files.copy(url.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
try {
embeddedActiveMQ.getActiveMQServer().createQueue(new QueueConfiguration("virtualQueue").setUser("bob"));
embeddedActiveMQ.getActiveMQServer().updateQueue(new QueueConfiguration("virtualQueue").setFilterString("foo"));
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
.getBinding(new SimpleString("virtualQueue"));
org.apache.activemq.artemis.core.server.Queue queue = queueBinding.getQueue();
assertEquals(new SimpleString("bob"), queue.getUser());
assertEquals(new SimpleString("foo"), queue.getFilter().getFilterString());
} finally {
embeddedActiveMQ.stop();
}
}
@Test
public void testRedeployQueueDefaults() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-defaults-before.xml");
URL newConfig = RedeployTest.class.getClassLoader().getResource("reload-queue-defaults-after.xml");
Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
try {
LocalQueueBinding queueBinding = (LocalQueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
.getBinding(new SimpleString("myQueue"));
org.apache.activemq.artemis.core.server.Queue queue = queueBinding.getQueue();
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queue.getMaxConsumers());
assertNotEquals(RoutingType.MULTICAST, queue.getRoutingType());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queue.isPurgeOnNoConsumers());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultEnabled(), queue.isEnabled());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultExclusive(), queue.isExclusive());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultGroupRebalance(), queue.isGroupRebalance());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultGroupBuckets(), queue.getGroupBuckets());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultGroupFirstKey(), queue.getGroupFirstKey());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultNonDestructive(), queue.isNonDestructive());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), queue.getConsumersBeforeDispatch());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), queue.getDelayBeforeDispatch());
assertNotNull(queue.getFilter());
assertEquals(new SimpleString("jdoe"), queue.getUser());
assertNotEquals(ActiveMQDefaultConfiguration.getDefaultRingSize(), queue.getRingSize());
deployBrokerConfig(embeddedActiveMQ, newConfig);
assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queue.getMaxConsumers());
assertEquals(RoutingType.MULTICAST, queue.getRoutingType());
assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queue.isPurgeOnNoConsumers());
assertEquals(ActiveMQDefaultConfiguration.getDefaultEnabled(), queue.isEnabled());
assertEquals(ActiveMQDefaultConfiguration.getDefaultExclusive(), queue.isExclusive());
assertEquals(ActiveMQDefaultConfiguration.getDefaultGroupRebalance(), queue.isGroupRebalance());
assertEquals(ActiveMQDefaultConfiguration.getDefaultGroupBuckets(), queue.getGroupBuckets());
assertEquals(ActiveMQDefaultConfiguration.getDefaultGroupFirstKey(), queue.getGroupFirstKey());
assertEquals(ActiveMQDefaultConfiguration.getDefaultNonDestructive(), queue.isNonDestructive());
assertEquals(ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), queue.getConsumersBeforeDispatch());
assertEquals(ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), queue.getDelayBeforeDispatch());
assertNull(queue.getFilter());
assertNull(queue.getUser());
assertEquals(ActiveMQDefaultConfiguration.getDefaultRingSize(), queue.getRingSize());
} finally {
embeddedActiveMQ.stop();
}
}
@Test
public void testUndeployDivert() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL baseConfig = RedeployTest.class.getClassLoader().getResource("reload-divert-undeploy-before.xml");
URL newConfig = RedeployTest.class.getClassLoader().getResource("reload-divert-undeploy-after.xml");
Files.copy(baseConfig.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
try {
DivertBinding divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
.getBinding(new SimpleString("divert"));
assertNotNull(divertBinding);
Queue sourceQueue = (Queue) ActiveMQDestination.createDestination("queue://source", ActiveMQDestination.TYPE.QUEUE);
Queue targetQueue = (Queue) ActiveMQDestination.createDestination("queue://target", ActiveMQDestination.TYPE.QUEUE);
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
MessageProducer sourceProducer = session.createProducer(sourceQueue);
MessageConsumer sourceConsumer = session.createConsumer(sourceQueue);
MessageConsumer targetConsumer = session.createConsumer(targetQueue)) {
connection.start();
Message message = session.createTextMessage("Hello world");
sourceProducer.send(message);
assertNotNull(sourceConsumer.receive(2000));
assertNotNull(targetConsumer.receive(2000));
}
deployBrokerConfig(embeddedActiveMQ, newConfig);
Wait.waitFor(() -> embeddedActiveMQ.getActiveMQServer().getPostOffice()
.getBinding(new SimpleString("divert")) == null);
divertBinding = (DivertBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice()
.getBinding(new SimpleString("divert"));
assertNull(divertBinding);
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
MessageProducer sourceProducer = session.createProducer(sourceQueue);
MessageConsumer sourceConsumer = session.createConsumer(sourceQueue);
MessageConsumer targetConsumer = session.createConsumer(targetQueue)) {
connection.start();
Message message = session.createTextMessage("Hello world");
sourceProducer.send(message);
assertNotNull(sourceConsumer.receive(2000));
assertNull(targetConsumer.receiveNoWait());
}
} finally {
embeddedActiveMQ.stop();
}
}
@Test
public void testRedeployWithFailover() throws Exception {
Set<Role> original = new HashSet<>();
original.add(new Role("a", false, true, false, false, false, false, false, false, false, false));
Set<Role> changed = new HashSet<>();
changed.add(new Role("b", false, true, false, false, false, false, false, false, false, false));
EmbeddedActiveMQ live = new EmbeddedActiveMQ();
EmbeddedActiveMQ backup = new EmbeddedActiveMQ();
try {
// set these system properties to use in the relevant broker.xml files
System.setProperty("live-data-dir", getTestDirfile().toPath() + "/redeploy-live-data");
System.setProperty("backup-data-dir", getTestDirfile().toPath() + "/redeploy-backup-data");
Path liveBrokerXML = getTestDirfile().toPath().resolve("live.xml");
Path backupBrokerXML = getTestDirfile().toPath().resolve("backup.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-live-original.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-live-changed.xml");
URL url3 = RedeployTest.class.getClassLoader().getResource("reload-backup-original.xml");
URL url4 = RedeployTest.class.getClassLoader().getResource("reload-backup-changed.xml");
Files.copy(url1.openStream(), liveBrokerXML);
Files.copy(url3.openStream(), backupBrokerXML);
live.setConfigResourcePath(liveBrokerXML.toUri().toString());
live.start();
waitForServerToStart(live.getActiveMQServer());
backup.setConfigResourcePath(backupBrokerXML.toUri().toString());
backup.start();
assertTrue(Wait.waitFor(() -> backup.getActiveMQServer().isReplicaSync(), 15000, 200));
assertEquals("Test address settings original - live", AddressFullMessagePolicy.BLOCK, live.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
assertEquals("Test address settings original - backup", AddressFullMessagePolicy.BLOCK, backup.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
assertEquals("Test security settings original - live", original, live.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));
assertEquals("Test security settings original - backup", original, backup.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));
final ReusableLatch liveReloadLatch = new ReusableLatch(1);
Runnable liveTick = () -> liveReloadLatch.countDown();
live.getActiveMQServer().getReloadManager().setTick(liveTick);
final ReusableLatch backupReloadTickLatch = new ReusableLatch(1);
Runnable backupTick = () -> backupReloadTickLatch.countDown();
backup.getActiveMQServer().getReloadManager().setTick(backupTick);
liveReloadLatch.await(10, TimeUnit.SECONDS);
Files.copy(url2.openStream(), liveBrokerXML, StandardCopyOption.REPLACE_EXISTING);
liveBrokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
liveReloadLatch.countUp();
live.getActiveMQServer().getReloadManager().setTick(liveTick);
liveReloadLatch.await(10, TimeUnit.SECONDS);
backupReloadTickLatch.await(10, TimeUnit.SECONDS);
Files.copy(url4.openStream(), backupBrokerXML, StandardCopyOption.REPLACE_EXISTING);
backupBrokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
backupReloadTickLatch.countUp();
backup.getActiveMQServer().getReloadManager().setTick(backupTick);
backupReloadTickLatch.await(10, TimeUnit.SECONDS);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("myQueue2");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("text1"));
}
assertFalse(backup.getActiveMQServer().isActive());
assertEquals("Test address settings redeploy - live", AddressFullMessagePolicy.PAGE, live.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
assertEquals("Test security settings redeploy - live", changed, live.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));
live.stop();
assertTrue(Wait.waitFor(() -> (backup.getActiveMQServer().isActive()), 5000, 100));
factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61617");
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("myQueue2");
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("text"));
connection.start();
MessageConsumer consumer = session.createConsumer(session.createQueue("myQueue2"));
Assert.assertNotNull("Queue wasn't deployed accordingly", consumer.receive(5000));
Assert.assertNotNull(consumer.receive(5000));
}
assertEquals("Test security settings redeploy - backup", changed, backup.getActiveMQServer().getSecurityRepository().getMatch("myQueue"));
assertEquals("Test address settings redeploy - backup", AddressFullMessagePolicy.PAGE, backup.getActiveMQServer().getAddressSettingsRepository().getMatch("myQueue").getAddressFullMessagePolicy());
} finally {
live.stop();
backup.stop();
System.clearProperty("live-data-dir");
System.clearProperty("backup-data-dir");
}
}
private boolean tryConsume() throws JMSException {
try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
Connection connection = factory.createConnection();
Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue("NewQueue");
MessageConsumer consumer = session.createConsumer(queue);
return true;
} catch (JMSException e) {
return false;
}
}
@Test
public void testRedeployAddressQueue() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-address-queues.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-address-queues-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = new Runnable() {
@Override
public void run() {
latch.countDown();
}
};
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
try (JMSContext jmsContext = connectionFactory.createContext()) {
jmsContext.createSharedDurableConsumer(jmsContext.createTopic("config_test_consumer_created_queues"),"mySub").receive(100);
}
try {
latch.await(10, TimeUnit.SECONDS);
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_consumer_created_queues").contains("mySub"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "permanent_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "permanent_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(10, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
//Ensure queues created by clients (NOT by broker.xml are not removed when we reload).
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_consumer_created_queues").contains("mySub"));
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue"));
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertFalse(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "permanent_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "permanent_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "permanent_test_queue_removal").contains("permanent_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(1, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_change_queue"));
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal_queue_1"));
} finally {
embeddedActiveMQ.stop();
}
}
@Test
public void testRedeployChangeQueueRoutingType() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-queue-routingtype-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown);
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616");
try (JMSContext context = connectionFactory.createContext()) {
context.createProducer().send(context.createQueue("myAddress"), "hello");
}
latch.await(10, TimeUnit.SECONDS);
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
Assert.assertEquals(RoutingType.ANYCAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(latch::countDown);
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "myAddress"));
Assert.assertEquals(RoutingType.MULTICAST, getQueue(embeddedActiveMQ, "myQueue").getRoutingType());
//Ensures the queue isnt detroyed by checking message sent before change is consumable after (e.g. no message loss)
try (JMSContext context = connectionFactory.createContext()) {
Message message = context.createSharedDurableConsumer(context.createTopic("myAddress"), "myQueue").receive();
assertEquals("hello", ((TextMessage) message).getText());
}
} finally {
embeddedActiveMQ.stop();
}
}
/**
* Simulates Stop and Start that occurs when network health checker stops the server when network is detected unhealthy
* and re-starts the broker once detected that it is healthy again.
*
* @throws Exception for anything un-expected, test will fail.
*/
@Test
public void testRedeployStopAndRestart() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("reload-original.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("reload-changed.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = latch::countDown;
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
try {
latch.await(10, TimeUnit.SECONDS);
Assert.assertEquals(getSecurityRoles(embeddedActiveMQ, "security_address").size(), 1);
Assert.assertEquals(getSecurityRoles(embeddedActiveMQ, "security_address").iterator().next().getName(), "b");
Assert.assertEquals(getAddressSettings(embeddedActiveMQ, "address_settings_address").getDeadLetterAddress(), SimpleString.toSimpleString("OriginalDLQ"));
Assert.assertEquals(getAddressSettings(embeddedActiveMQ, "address_settings_address").getExpiryAddress(), SimpleString.toSimpleString("OriginalExpiryQueue"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(10, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isEnabled());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isPurgeOnNoConsumers());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isEnabled());
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
//Assert that the security settings change applied
Assert.assertEquals(getSecurityRoles(embeddedActiveMQ, "security_address").size(), 1);
Assert.assertEquals(getSecurityRoles(embeddedActiveMQ, "security_address").iterator().next().getName(), "c");
//Assert that the address settings change applied
Assert.assertEquals(getAddressSettings(embeddedActiveMQ, "address_settings_address").getDeadLetterAddress(), SimpleString.toSimpleString("NewDLQ"));
Assert.assertEquals(getAddressSettings(embeddedActiveMQ, "address_settings_address").getExpiryAddress(), SimpleString.toSimpleString("NewExpiryQueue"));
//Assert the address and queue changes applied
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue"));
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertFalse(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(1, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isEnabled());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isPurgeOnNoConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isEnabled());
} finally {
embeddedActiveMQ.stop();
}
try {
embeddedActiveMQ.start();
//Assert that the security settings changes persist a stop and start server (e.g. like what occurs if network health check stops the node), but JVM remains up.
Assert.assertEquals(getSecurityRoles(embeddedActiveMQ, "security_address").size(), 1);
Assert.assertEquals(getSecurityRoles(embeddedActiveMQ, "security_address").iterator().next().getName(), "c");
//Assert that the address settings changes persist a stop and start server (e.g. like what occurs if network health check stops the node), but JVM remains up.
Assert.assertEquals(getAddressSettings(embeddedActiveMQ, "address_settings_address").getDeadLetterAddress(), SimpleString.toSimpleString("NewDLQ"));
Assert.assertEquals(getAddressSettings(embeddedActiveMQ, "address_settings_address").getExpiryAddress(), SimpleString.toSimpleString("NewExpiryQueue"));
//Assert that the address and queue changes persist a stop and start server (e.g. like what occurs if network health check stops the node), but JVM remains up.
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal_no_queue"));
Assert.assertNull(getAddressInfo(embeddedActiveMQ, "config_test_address_removal"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_removal"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_1"));
Assert.assertFalse(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_removal").contains("config_test_queue_removal_queue_2"));
Assert.assertNotNull(getAddressInfo(embeddedActiveMQ, "config_test_queue_change"));
Assert.assertTrue(listQueuesNamesForAddress(embeddedActiveMQ, "config_test_queue_change").contains("config_test_queue_change_queue"));
Assert.assertEquals(1, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").getMaxConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isPurgeOnNoConsumers());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue").isEnabled());
Assert.assertEquals(false, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isPurgeOnNoConsumers());
Assert.assertEquals(true, getQueue(embeddedActiveMQ, "config_test_queue_change_queue_defaults").isEnabled());
} finally {
embeddedActiveMQ.stop();
}
}
private AddressSettings getAddressSettings(EmbeddedActiveMQ embeddedActiveMQ, String address) {
return embeddedActiveMQ.getActiveMQServer().getAddressSettingsRepository().getMatch(address);
}
private Set<Role> getSecurityRoles(EmbeddedActiveMQ embeddedActiveMQ, String address) {
return embeddedActiveMQ.getActiveMQServer().getSecurityRepository().getMatch(address);
}
private AddressInfo getAddressInfo(EmbeddedActiveMQ embeddedActiveMQ, String address) {
return embeddedActiveMQ.getActiveMQServer().getPostOffice().getAddressInfo(SimpleString.toSimpleString(address));
}
private org.apache.activemq.artemis.core.server.Queue getQueue(EmbeddedActiveMQ embeddedActiveMQ, String queueName) throws Exception {
QueueBinding queueBinding = (QueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString(queueName));
return queueBinding == null ? null : queueBinding.getQueue();
}
private List<String> listQueuesNamesForAddress(EmbeddedActiveMQ embeddedActiveMQ, String address) throws Exception {
return embeddedActiveMQ.getActiveMQServer().getPostOffice().listQueuesForAddress(SimpleString.toSimpleString(address)).stream().map(
org.apache.activemq.artemis.core.server.Queue::getName).map(SimpleString::toString).collect(Collectors.toList());
}
}