blob: 7ba1746d5e10a773215f9b631d8f5d3decdc7e83 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.jms;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.embedded.EmbeddedActiveMQ;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.junit.Test;
public class RedeployTempTest extends ActiveMQTestBase {
@Test
public void testRedeployAddressQueueOpenWire() throws Exception {
Path brokerXML = getTestDirfile().toPath().resolve("broker.xml");
URL url1 = RedeployTest.class.getClassLoader().getResource("RedeployTempTest-reload-temp.xml");
URL url2 = RedeployTest.class.getClassLoader().getResource("RedeployTempTest-reload-temp-updated.xml");
Files.copy(url1.openStream(), brokerXML);
EmbeddedActiveMQ embeddedActiveMQ = new EmbeddedActiveMQ();
embeddedActiveMQ.setConfigResourcePath(brokerXML.toUri().toString());
embeddedActiveMQ.start();
final ReusableLatch latch = new ReusableLatch(1);
Runnable tick = latch::countDown;
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
ConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory();
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("queue");
MessageProducer messageProducer = session.createProducer(destination);
Destination replyTo = session.createTemporaryQueue();
Message message = session.createTextMessage("hello");
message.setJMSReplyTo(replyTo);
messageProducer.send(message);
try {
latch.await(10, TimeUnit.SECONDS);
Files.copy(url2.openStream(), brokerXML, StandardCopyOption.REPLACE_EXISTING);
brokerXML.toFile().setLastModified(System.currentTimeMillis() + 1000);
latch.setCount(1);
embeddedActiveMQ.getActiveMQServer().getReloadManager().setTick(tick);
latch.await(10, TimeUnit.SECONDS);
try (Connection connectionConsumer = connectionFactory.createConnection()) {
connectionConsumer.start();
try (Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Destination destinationConsumer = session.createQueue("queue");
MessageConsumer messageConsumer = sessionConsumer.createConsumer(destinationConsumer);
Message receivedMessage = messageConsumer.receive(1000);
assertEquals("hello", ((TextMessage) receivedMessage).getText());
Destination replyToDest = receivedMessage.getJMSReplyTo();
Message message1 = sessionConsumer.createTextMessage("hi there");
session.createProducer(replyToDest).send(message1);
}
}
MessageConsumer messageConsumerProducer = session.createConsumer(replyTo);
Message message2 = messageConsumerProducer.receive(1000);
assertEquals("hi there", ((TextMessage) message2).getText());
} finally {
connection.close();
embeddedActiveMQ.stop();
}
}
private AddressSettings getAddressSettings(EmbeddedActiveMQ embeddedActiveMQ, String address) {
return embeddedActiveMQ.getActiveMQServer().getAddressSettingsRepository().getMatch(address);
}
private Set<Role> getSecurityRoles(EmbeddedActiveMQ embeddedActiveMQ, String address) {
return embeddedActiveMQ.getActiveMQServer().getSecurityRepository().getMatch(address);
}
private AddressInfo getAddressInfo(EmbeddedActiveMQ embeddedActiveMQ, String address) {
return embeddedActiveMQ.getActiveMQServer().getPostOffice().getAddressInfo(SimpleString.toSimpleString(address));
}
private org.apache.activemq.artemis.core.server.Queue getQueue(EmbeddedActiveMQ embeddedActiveMQ,
String queueName) throws Exception {
QueueBinding queueBinding = (QueueBinding) embeddedActiveMQ.getActiveMQServer().getPostOffice().getBinding(SimpleString.toSimpleString(queueName));
return queueBinding == null ? null : queueBinding.getQueue();
}
private List<String> listQueuesNamesForAddress(EmbeddedActiveMQ embeddedActiveMQ, String address) throws Exception {
return embeddedActiveMQ.getActiveMQServer().getPostOffice().listQueuesForAddress(SimpleString.toSimpleString(address)).stream().map(org.apache.activemq.artemis.core.server.Queue::getName).map(SimpleString::toString).collect(Collectors.toList());
}
}