blob: c57f6a1e51fdd49b0db9eed91d63adb810c0a680 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.cli.kahadb.exporter;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataImporter;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.cli.kahadb.exporter.ExportConfiguration.ExportConfigurationBuilder;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertTrue;
public class LiveMigrationTest {
static final Logger LOG = LoggerFactory.getLogger(LiveMigrationTest.class);
final int numMessagesToSend = 10;
final CountDownLatch gotAllTest = new CountDownLatch(2*numMessagesToSend);
final CountDownLatch gotAllPostTest = new CountDownLatch(2*numMessagesToSend);
@Rule
public TemporaryFolder storeFolder = new TemporaryFolder();
public PersistenceAdapter getPersistenceAdapter(File dir) {
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setJournalMaxFileLength(1024 * 1024);
adapter.setDirectory(dir);
return adapter;
}
public void exportStore(final ExportConfigurationBuilder builder) throws Exception {
Exporter.exportStore(builder.build());
}
@Test
public void testMigrateVT() throws Exception {
File sourceDir = storeFolder.newFolder();
ActiveMQQueue queueA = new ActiveMQQueue("Consumer.A.VirtualTopic.T");
ActiveMQQueue queueB = new ActiveMQQueue("Consumer.B.VirtualTopic.T");
BrokerService classic = startActiveMQClassic(sourceDir);
int port = classic.getTransportConnectorByScheme("tcp").getConnectUri().getPort();
// note checkForDuplicates=false is necessary because durable subs messages retain the topic destination, and
// the classic audit is per destination. Thus two consumers on the same connection (which is unusual) can clash when the audit is enabled
ConnectionFactory cf = new org.apache.activemq.ActiveMQConnectionFactory("failover:(tcp://localhost:" + port+")?jms.checkForDuplicates=false");
Connection consumerConnection = cf.createConnection();
consumerConnection.start();
final Session sessionA = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumerA = sessionA.createConsumer(queueA);
messageConsumerA.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
sessionA.commit();
tally("A", message);
} catch (Exception ok) {
ok.printStackTrace();
}
}
});
final Session sessionB = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer messageConsumerB = sessionB.createConsumer(queueB);
messageConsumerB.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
sessionB.commit();
tally("B", message);
} catch (Exception ok) {
ok.printStackTrace();
}
}
});
Connection producerConnection = cf.createConnection();
try {
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = producerSession.createProducer(producerSession.createTopic("VirtualTopic.T"));
for (int i = 0; i < numMessagesToSend; i++) {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("Test: " + i);
messageProducer.send(message);
}
} finally {
if (producerConnection != null) {
producerConnection.close();
}
}
TimeUnit.SECONDS.sleep(5);
assertTrue(gotAllTest.getCount() < 2*numMessagesToSend);
// stop classic
classic.stop();
// migrate and start artemis
File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
exportStore(ExportConfigurationBuilder.newBuilder().setSource(sourceDir).setTarget(xmlFile).setVirtualTopicConsumerWildcards("Consumer.*.>;2"));
printFile(xmlFile);
final ActiveMQServer artemisServer = buildArtemisBroker(port);
artemisServer.start();
artemisServer.getManagementService().enableNotifications(false);
XmlDataImporter dataImporter = new XmlDataImporter();
dataImporter.process(xmlFile.getAbsolutePath(), "localhost", port, false);
try {
// wait for all messages to be consumed from classic clients
assertTrue("got all", gotAllTest.await(numMessagesToSend, TimeUnit.SECONDS));
// lets send some more to be sure to be sure
producerConnection = cf.createConnection();
try {
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = producerSession.createProducer(producerSession.createTopic("VirtualTopic.T"));
for (int i = 0; i < numMessagesToSend; i++) {
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText("PostTest: " + i);
messageProducer.send(message);
}
} finally {
if (producerConnection != null) {
producerConnection.close();
}
}
assertTrue("got all post test", gotAllPostTest.await(numMessagesToSend, TimeUnit.SECONDS));
} finally {
consumerConnection.close();
artemisServer.stop();
}
}
private void tally(String id, Message message) throws Exception {
final String text = ((TextMessage) message).getText();
LOG.info("{} got: {} text val {}", id, message.getJMSMessageID(), text);
if (text.startsWith("PostTest")) {
gotAllPostTest.countDown();
} else {
gotAllTest.countDown();
TimeUnit.SECONDS.sleep(1);
}
}
private BrokerService startActiveMQClassic(File sourceDir) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setPersistenceAdapter(getPersistenceAdapter(sourceDir));
brokerService.addConnector("tcp://localhost:0");
brokerService.start();
return brokerService;
}
public ActiveMQServer buildArtemisBroker(int port) throws IOException {
Configuration configuration = new ConfigurationImpl();
configuration.setPersistenceEnabled(true);
configuration.setSecurityEnabled(false);
Map<String, Object> connectionParams = new HashMap<String, Object>();
connectionParams.put(
org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
connectionParams.put("virtualTopicConsumerWildcards","Consumer.*.>;2");
configuration.setBindingsDirectory(storeFolder.newFolder().getAbsolutePath());
configuration.setJournalDirectory(storeFolder.newFolder().getAbsolutePath());
configuration.setLargeMessagesDirectory(storeFolder.newFolder().getAbsolutePath());
configuration.setPagingDirectory(storeFolder.newFolder().getAbsolutePath());
configuration.addAcceptorConfiguration(
new TransportConfiguration(NettyAcceptorFactory.class.getName(), connectionParams));
configuration.addConnectorConfiguration("connector",
new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
return new ActiveMQServerImpl(configuration);
}
protected void printFile(File file) throws IOException {
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
}
}
}
}