AMQCLI-14 add live migration verification test with classic failover clients
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/LiveMigrationTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/LiveMigrationTest.java
new file mode 100644
index 0000000..c57f6a1
--- /dev/null
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/LiveMigrationTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataImporter;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.cli.kahadb.exporter.ExportConfiguration.ExportConfigurationBuilder;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+public class LiveMigrationTest {
+
+ static final Logger LOG = LoggerFactory.getLogger(LiveMigrationTest.class);
+
+ final int numMessagesToSend = 10;
+ final CountDownLatch gotAllTest = new CountDownLatch(2*numMessagesToSend);
+ final CountDownLatch gotAllPostTest = new CountDownLatch(2*numMessagesToSend);
+
+ @Rule
+ public TemporaryFolder storeFolder = new TemporaryFolder();
+
+ public PersistenceAdapter getPersistenceAdapter(File dir) {
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setJournalMaxFileLength(1024 * 1024);
+ adapter.setDirectory(dir);
+ return adapter;
+ }
+
+ public void exportStore(final ExportConfigurationBuilder builder) throws Exception {
+ Exporter.exportStore(builder.build());
+ }
+
+ @Test
+ public void testMigrateVT() throws Exception {
+ File sourceDir = storeFolder.newFolder();
+ ActiveMQQueue queueA = new ActiveMQQueue("Consumer.A.VirtualTopic.T");
+ ActiveMQQueue queueB = new ActiveMQQueue("Consumer.B.VirtualTopic.T");
+
+ BrokerService classic = startActiveMQClassic(sourceDir);
+
+ int port = classic.getTransportConnectorByScheme("tcp").getConnectUri().getPort();
+
+ // note checkForDuplicates=false is necessary because durable subs messages retain the topic destination, and
+ // the classic audit is per destination. Thus two consumers on the same connection (which is unusual) can clash when the audit is enabled
+ ConnectionFactory cf = new org.apache.activemq.ActiveMQConnectionFactory("failover:(tcp://localhost:" + port+")?jms.checkForDuplicates=false");
+ Connection consumerConnection = cf.createConnection();
+ consumerConnection.start();
+
+ final Session sessionA = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumerA = sessionA.createConsumer(queueA);
+ messageConsumerA.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ sessionA.commit();
+ tally("A", message);
+ } catch (Exception ok) {
+ ok.printStackTrace();
+ }
+ }
+ });
+
+ final Session sessionB = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+ MessageConsumer messageConsumerB = sessionB.createConsumer(queueB);
+ messageConsumerB.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ sessionB.commit();
+ tally("B", message);
+ } catch (Exception ok) {
+ ok.printStackTrace();
+ }
+ }
+ });
+
+ Connection producerConnection = cf.createConnection();
+ try {
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer messageProducer = producerSession.createProducer(producerSession.createTopic("VirtualTopic.T"));
+ for (int i = 0; i < numMessagesToSend; i++) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText("Test: " + i);
+ messageProducer.send(message);
+ }
+ } finally {
+ if (producerConnection != null) {
+ producerConnection.close();
+ }
+ }
+
+ TimeUnit.SECONDS.sleep(5);
+
+ assertTrue(gotAllTest.getCount() < 2*numMessagesToSend);
+
+ // stop classic
+ classic.stop();
+
+ // migrate and start artemis
+
+ File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
+ exportStore(ExportConfigurationBuilder.newBuilder().setSource(sourceDir).setTarget(xmlFile).setVirtualTopicConsumerWildcards("Consumer.*.>;2"));
+
+ printFile(xmlFile);
+
+ final ActiveMQServer artemisServer = buildArtemisBroker(port);
+ artemisServer.start();
+ artemisServer.getManagementService().enableNotifications(false);
+
+ XmlDataImporter dataImporter = new XmlDataImporter();
+ dataImporter.process(xmlFile.getAbsolutePath(), "localhost", port, false);
+
+ try {
+ // wait for all messages to be consumed from classic clients
+ assertTrue("got all", gotAllTest.await(numMessagesToSend, TimeUnit.SECONDS));
+
+ // lets send some more to be sure to be sure
+ producerConnection = cf.createConnection();
+ try {
+ Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer messageProducer = producerSession.createProducer(producerSession.createTopic("VirtualTopic.T"));
+ for (int i = 0; i < numMessagesToSend; i++) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText("PostTest: " + i);
+ messageProducer.send(message);
+ }
+ } finally {
+ if (producerConnection != null) {
+ producerConnection.close();
+ }
+ }
+
+ assertTrue("got all post test", gotAllPostTest.await(numMessagesToSend, TimeUnit.SECONDS));
+
+ } finally {
+
+ consumerConnection.close();
+ artemisServer.stop();
+ }
+ }
+
+ private void tally(String id, Message message) throws Exception {
+ final String text = ((TextMessage) message).getText();
+ LOG.info("{} got: {} text val {}", id, message.getJMSMessageID(), text);
+ if (text.startsWith("PostTest")) {
+ gotAllPostTest.countDown();
+ } else {
+ gotAllTest.countDown();
+ TimeUnit.SECONDS.sleep(1);
+ }
+ }
+
+ private BrokerService startActiveMQClassic(File sourceDir) throws Exception {
+ BrokerService brokerService = new BrokerService();
+ brokerService.setPersistenceAdapter(getPersistenceAdapter(sourceDir));
+ brokerService.addConnector("tcp://localhost:0");
+ brokerService.start();
+ return brokerService;
+ }
+
+ public ActiveMQServer buildArtemisBroker(int port) throws IOException {
+ Configuration configuration = new ConfigurationImpl();
+
+ configuration.setPersistenceEnabled(true);
+ configuration.setSecurityEnabled(false);
+
+ Map<String, Object> connectionParams = new HashMap<String, Object>();
+ connectionParams.put(
+ org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
+ connectionParams.put("virtualTopicConsumerWildcards","Consumer.*.>;2");
+
+
+ configuration.setBindingsDirectory(storeFolder.newFolder().getAbsolutePath());
+ configuration.setJournalDirectory(storeFolder.newFolder().getAbsolutePath());
+ configuration.setLargeMessagesDirectory(storeFolder.newFolder().getAbsolutePath());
+ configuration.setPagingDirectory(storeFolder.newFolder().getAbsolutePath());
+
+ configuration.addAcceptorConfiguration(
+ new TransportConfiguration(NettyAcceptorFactory.class.getName(), connectionParams));
+ configuration.addConnectorConfiguration("connector",
+ new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
+
+
+ return new ActiveMQServerImpl(configuration);
+ }
+
+
+ protected void printFile(File file) throws IOException {
+ try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ System.out.println(line);
+ }
+ }
+ }
+
+}