AMQCLI-8: Adding support for multikahadb

MultiKahaDb can now be used as a source for export.
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
index fee79bf..5cb7992 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
@@ -20,18 +20,26 @@
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.OutputStream;
+import java.util.List;
+import java.util.stream.Collectors;
 import java.util.zip.GZIPOutputStream;
 
 import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
 import javax.xml.stream.XMLStreamWriter;
 
 import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
 import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener;
 import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMetadataExporter;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * KahaDB Exporter
  */
@@ -45,15 +53,25 @@
     }
 
     public static void exportKahaDbStore(final File kahaDbDir, final File artemisXml) throws Exception {
-        Exporter.exportKahaDbStore(kahaDbDir, artemisXml, false);
+        Exporter.exportStore(kahaDbDir, artemisXml, false, false);
     }
 
     public static void exportKahaDbStore(final File kahaDbDir, final File artemisXml,
             boolean compress) throws Exception {
+        Exporter.exportStore(kahaDbDir, artemisXml, false, compress);
+    }
 
-        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
-        adapter.setDirectory(kahaDbDir);
-        adapter.start();
+    public static void exportMultiKahaDbStore(final File kahaDbDir, final File artemisXml) throws Exception {
+        Exporter.exportStore(kahaDbDir, artemisXml, true, false);
+    }
+
+    public static void exportMultiKahaDbStore(final File kahaDbDir, final File artemisXml,
+            boolean compress) throws Exception {
+        Exporter.exportStore(kahaDbDir, artemisXml, true, compress);
+    }
+
+    private static void exportStore(final File kahaDbDir, final File artemisXml,
+            boolean multiKaha, boolean compress) throws Exception {
 
         if (artemisXml.exists()) {
             throw new IllegalStateException("File: " + artemisXml + " already exists");
@@ -65,11 +83,66 @@
 
             final XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
             final ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);
+
+            xmlMarshaller.appendJournalOpen();
+
+            if (multiKaha) {
+                appendMultiKahaDbStore(xmlMarshaller, getMultiKahaDbAdapter(kahaDbDir));
+            } else {
+                appendKahaDbStore(xmlMarshaller, getKahaDbAdapter(kahaDbDir));
+            }
+
+            xmlMarshaller.appendJournalClose(true);
+        }
+
+        long end = System.currentTimeMillis();
+
+        LOG.info("Total export time: " + (end - start) + " ms");
+    }
+
+
+    private static void appendMultiKahaDbStore(final ArtemisJournalMarshaller xmlMarshaller,
+            MultiKahaDBPersistenceAdapter multiAdapter) throws Exception {
+
+        try {
+            multiAdapter.start();
+
+            List<KahaDBExporter> dbExporters = multiAdapter.getAdapters().stream()
+                    .filter(adapter -> adapter instanceof KahaDBPersistenceAdapter)
+                    .map(adapter -> {
+                        KahaDBPersistenceAdapter kahaAdapter = (KahaDBPersistenceAdapter) adapter;
+                        return new KahaDBExporter(kahaAdapter,
+                              new ArtemisXmlMetadataExporter(kahaAdapter.getStore(), xmlMarshaller),
+                              new ArtemisXmlMessageRecoveryListener(kahaAdapter.getStore(), xmlMarshaller));
+            }).collect(Collectors.toList());
+
+            xmlMarshaller.appendBindingsElement();
+            for (KahaDBExporter dbExporter : dbExporters) {
+                dbExporter.exportMetadata();
+            }
+            xmlMarshaller.appendEndElement();
+
+            xmlMarshaller.appendMessagesElement();
+            for (KahaDBExporter dbExporter : dbExporters) {
+                dbExporter.exportQueues();
+                dbExporter.exportTopics();
+            }
+            xmlMarshaller.appendEndElement();
+        } finally {
+            multiAdapter.stop();
+        }
+    }
+
+    private static void appendKahaDbStore(final ArtemisJournalMarshaller xmlMarshaller,
+            KahaDBPersistenceAdapter adapter) throws Exception {
+
+        try {
+            adapter.start();
+
             final KahaDBExporter dbExporter = new KahaDBExporter(adapter,
                     new ArtemisXmlMetadataExporter(adapter.getStore(), xmlMarshaller),
                     new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
 
-            xmlMarshaller.appendJournalOpen();
             xmlMarshaller.appendBindingsElement();
             dbExporter.exportMetadata();
             xmlMarshaller.appendEndElement();
@@ -77,12 +150,28 @@
             dbExporter.exportQueues();
             dbExporter.exportTopics();
             xmlMarshaller.appendEndElement();
-            xmlMarshaller.appendJournalClose(true);
         } finally {
             adapter.stop();
         }
-        long end = System.currentTimeMillis();
+    }
 
-        LOG.info("Total export time: " + (end - start) + " ms");
+    private static KahaDBPersistenceAdapter getKahaDbAdapter(File dir) {
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setDirectory(dir);
+        return adapter;
+    }
+
+    private static MultiKahaDBPersistenceAdapter getMultiKahaDbAdapter(File dir) {
+        MultiKahaDBPersistenceAdapter adapter = new MultiKahaDBPersistenceAdapter();
+        adapter.setDirectory(dir);
+
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setDirectory(dir);
+        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+        filtered.setPersistenceAdapter(kahaStore);
+        filtered.setPerDestination(true);
+
+        adapter.setFilteredPersistenceAdapters(Lists.newArrayList(filtered));
+        return adapter;
     }
 }
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
index 8efdf42..b89e035 100644
--- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
@@ -65,8 +65,8 @@
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IdGenerator;
 import org.junit.Rule;
@@ -75,13 +75,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ExporterTest {
+public abstract class ExporterTest {
 
     static final Logger LOG = LoggerFactory.getLogger(ExporterTest.class);
 
     @Rule
     public TemporaryFolder storeFolder = new TemporaryFolder();
 
+    public abstract PersistenceAdapter getPersistenceAdapter(File dir);
+
+    public abstract void exportStore(final File kahaDbDir, final File xmlFile) throws Exception;
+
     /**
      * TODO Improve test when real exporting is done, for now this just
      * tests that the recovery listener iterates over all the queue messages
@@ -93,9 +97,7 @@
 
         File kahaDbDir = storeFolder.newFolder();
         ActiveMQQueue queue = new ActiveMQQueue("test.queue");
-        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
-        adapter.setJournalMaxFileLength(1024 * 1024);
-        adapter.setDirectory(kahaDbDir);
+        PersistenceAdapter adapter = getPersistenceAdapter(kahaDbDir);
         adapter.start();
         MessageStore messageStore = adapter.createQueueMessageStore(queue);
         messageStore.start();
@@ -154,7 +156,7 @@
         adapter.stop();
 
         File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
-        Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
+        exportStore(kahaDbDir, xmlFile);
 
       // printFile(xmlFile);
 
@@ -229,9 +231,7 @@
         File kahaDbDir = storeFolder.newFolder();
 
         ActiveMQTopic topic = new ActiveMQTopic("test.topic");
-        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
-        adapter.setJournalMaxFileLength(1024 * 1024);
-        adapter.setDirectory(kahaDbDir);
+        PersistenceAdapter adapter = getPersistenceAdapter(kahaDbDir);
         adapter.start();
         TopicMessageStore messageStore = adapter.createTopicMessageStore(topic);
         messageStore.start();
@@ -264,9 +264,9 @@
         adapter.stop();
 
         File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
-        Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
+        exportStore(kahaDbDir, xmlFile);
 
-     //   printFile(xmlFile);
+        printFile(xmlFile);
 
         validate(xmlFile, 5);
 
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/KahaDbExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/KahaDbExporterTest.java
new file mode 100644
index 0000000..e9411af
--- /dev/null
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/KahaDbExporterTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import java.io.File;
+
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+public class KahaDbExporterTest extends ExporterTest {
+
+    @Override
+    public PersistenceAdapter getPersistenceAdapter(File dir) {
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setJournalMaxFileLength(1024 * 1024);
+        adapter.setDirectory(dir);
+        return adapter;
+    }
+
+    @Override
+    public void exportStore(File kahaDbDir, File xmlFile) throws Exception {
+        Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
+    }
+}
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java
new file mode 100644
index 0000000..a006cef
--- /dev/null
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+import com.google.common.collect.Lists;
+
+public class MultiKahaDbExporterTest extends ExporterTest {
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.cli.kahadb.exporter.ExporterTest#getPersistenceAdapter(java.io.File)
+     */
+    @Override
+    public PersistenceAdapter getPersistenceAdapter(File dir) {
+        MultiKahaDBPersistenceAdapter adapter = new MultiKahaDBPersistenceAdapter();
+        adapter.setJournalMaxFileLength(1024 * 1024);
+        adapter.setDirectory(dir);
+
+        KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+        kahaStore.setDirectory(dir);
+        FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+        filtered.setPersistenceAdapter(kahaStore);
+        filtered.setPerDestination(true);
+
+        adapter.setFilteredPersistenceAdapters(Lists.newArrayList(filtered));
+        return adapter;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.cli.kahadb.exporter.ExporterTest#exportStore(java.io.File, java.io.File)
+     */
+    @Override
+    public void exportStore(File kahaDbDir, File xmlFile) throws Exception {
+        Exporter.exportMultiKahaDbStore(kahaDbDir, xmlFile);
+
+    }
+
+}