AMQCLI-14 add live migration verification test with classic failover clients
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/LiveMigrationTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/LiveMigrationTest.java
new file mode 100644
index 0000000..c57f6a1
--- /dev/null
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/LiveMigrationTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataImporter;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.cli.kahadb.exporter.ExportConfiguration.ExportConfigurationBuilder;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+public class LiveMigrationTest {
+
+   static final Logger LOG = LoggerFactory.getLogger(LiveMigrationTest.class);
+
+   final int numMessagesToSend = 10;
+   final CountDownLatch gotAllTest = new CountDownLatch(2*numMessagesToSend);
+   final CountDownLatch gotAllPostTest = new CountDownLatch(2*numMessagesToSend);
+
+   @Rule
+   public TemporaryFolder storeFolder = new TemporaryFolder();
+
+   public PersistenceAdapter getPersistenceAdapter(File dir) {
+      KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+      adapter.setJournalMaxFileLength(1024 * 1024);
+      adapter.setDirectory(dir);
+      return adapter;
+   }
+
+   public void exportStore(final ExportConfigurationBuilder builder) throws Exception {
+      Exporter.exportStore(builder.build());
+   }
+
+   @Test
+   public void testMigrateVT() throws Exception {
+      File sourceDir = storeFolder.newFolder();
+      ActiveMQQueue queueA = new ActiveMQQueue("Consumer.A.VirtualTopic.T");
+      ActiveMQQueue queueB = new ActiveMQQueue("Consumer.B.VirtualTopic.T");
+
+      BrokerService classic = startActiveMQClassic(sourceDir);
+
+      int port = classic.getTransportConnectorByScheme("tcp").getConnectUri().getPort();
+
+      // note checkForDuplicates=false is necessary because durable subs messages retain the topic destination, and
+      // the classic audit is per destination. Thus two consumers on the same connection (which is unusual) can clash when the audit is enabled
+      ConnectionFactory cf = new org.apache.activemq.ActiveMQConnectionFactory("failover:(tcp://localhost:" + port+")?jms.checkForDuplicates=false");
+      Connection consumerConnection = cf.createConnection();
+      consumerConnection.start();
+
+      final Session sessionA = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageConsumer messageConsumerA = sessionA.createConsumer(queueA);
+      messageConsumerA.setMessageListener(new MessageListener() {
+         @Override
+         public void onMessage(Message message) {
+            try {
+               sessionA.commit();
+               tally("A", message);
+            } catch (Exception ok) {
+               ok.printStackTrace();
+            }
+         }
+      });
+
+      final Session sessionB = consumerConnection.createSession(true, Session.SESSION_TRANSACTED);
+      MessageConsumer messageConsumerB = sessionB.createConsumer(queueB);
+      messageConsumerB.setMessageListener(new MessageListener() {
+         @Override
+         public void onMessage(Message message) {
+            try {
+               sessionB.commit();
+               tally("B", message);
+            } catch (Exception ok) {
+               ok.printStackTrace();
+            }
+         }
+      });
+
+      Connection producerConnection = cf.createConnection();
+      try {
+         Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer messageProducer = producerSession.createProducer(producerSession.createTopic("VirtualTopic.T"));
+         for (int i = 0; i < numMessagesToSend; i++) {
+            ActiveMQTextMessage message = new ActiveMQTextMessage();
+            message.setText("Test: " + i);
+            messageProducer.send(message);
+         }
+      } finally {
+         if (producerConnection != null) {
+            producerConnection.close();
+         }
+      }
+
+      TimeUnit.SECONDS.sleep(5);
+
+      assertTrue(gotAllTest.getCount() < 2*numMessagesToSend);
+
+      // stop classic
+      classic.stop();
+
+      // migrate and start artemis
+
+      File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
+      exportStore(ExportConfigurationBuilder.newBuilder().setSource(sourceDir).setTarget(xmlFile).setVirtualTopicConsumerWildcards("Consumer.*.>;2"));
+
+      printFile(xmlFile);
+
+      final ActiveMQServer artemisServer = buildArtemisBroker(port);
+      artemisServer.start();
+      artemisServer.getManagementService().enableNotifications(false);
+
+      XmlDataImporter dataImporter = new XmlDataImporter();
+      dataImporter.process(xmlFile.getAbsolutePath(), "localhost", port, false);
+
+      try {
+         // wait for all messages to be consumed from classic clients
+         assertTrue("got all", gotAllTest.await(numMessagesToSend, TimeUnit.SECONDS));
+
+         // lets send some more to be sure to be sure
+         producerConnection = cf.createConnection();
+         try {
+            Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageProducer messageProducer = producerSession.createProducer(producerSession.createTopic("VirtualTopic.T"));
+            for (int i = 0; i < numMessagesToSend; i++) {
+               ActiveMQTextMessage message = new ActiveMQTextMessage();
+               message.setText("PostTest: " + i);
+               messageProducer.send(message);
+            }
+         } finally {
+            if (producerConnection != null) {
+               producerConnection.close();
+            }
+         }
+
+         assertTrue("got all post test", gotAllPostTest.await(numMessagesToSend, TimeUnit.SECONDS));
+
+      } finally {
+
+         consumerConnection.close();
+         artemisServer.stop();
+      }
+   }
+
+   private void tally(String id, Message message) throws Exception {
+      final String text = ((TextMessage) message).getText();
+      LOG.info("{} got: {} text val {}", id, message.getJMSMessageID(), text);
+      if (text.startsWith("PostTest")) {
+         gotAllPostTest.countDown();
+      } else {
+         gotAllTest.countDown();
+         TimeUnit.SECONDS.sleep(1);
+      }
+   }
+
+   private BrokerService startActiveMQClassic(File sourceDir) throws Exception {
+      BrokerService brokerService = new BrokerService();
+      brokerService.setPersistenceAdapter(getPersistenceAdapter(sourceDir));
+      brokerService.addConnector("tcp://localhost:0");
+      brokerService.start();
+      return brokerService;
+   }
+
+   public ActiveMQServer buildArtemisBroker(int port) throws IOException {
+      Configuration configuration = new ConfigurationImpl();
+
+      configuration.setPersistenceEnabled(true);
+      configuration.setSecurityEnabled(false);
+
+      Map<String, Object> connectionParams = new HashMap<String, Object>();
+      connectionParams.put(
+         org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, port);
+      connectionParams.put("virtualTopicConsumerWildcards","Consumer.*.>;2");
+
+
+      configuration.setBindingsDirectory(storeFolder.newFolder().getAbsolutePath());
+      configuration.setJournalDirectory(storeFolder.newFolder().getAbsolutePath());
+      configuration.setLargeMessagesDirectory(storeFolder.newFolder().getAbsolutePath());
+      configuration.setPagingDirectory(storeFolder.newFolder().getAbsolutePath());
+
+      configuration.addAcceptorConfiguration(
+         new TransportConfiguration(NettyAcceptorFactory.class.getName(), connectionParams));
+      configuration.addConnectorConfiguration("connector",
+                                              new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
+
+
+      return new ActiveMQServerImpl(configuration);
+   }
+
+
+   protected void printFile(File file) throws IOException {
+      try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+         String line = null;
+         while ((line = br.readLine()) != null) {
+            System.out.println(line);
+         }
+      }
+   }
+
+}