AMQCLI-8: Adding support for multikahadb
MultiKahaDb can now be used as a source for export.
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
index fee79bf..5cb7992 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
@@ -20,18 +20,26 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
+import java.util.List;
+import java.util.stream.Collectors;
import java.util.zip.GZIPOutputStream;
import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener;
import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMetadataExporter;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+
/**
* KahaDB Exporter
*/
@@ -45,15 +53,25 @@
}
public static void exportKahaDbStore(final File kahaDbDir, final File artemisXml) throws Exception {
- Exporter.exportKahaDbStore(kahaDbDir, artemisXml, false);
+ Exporter.exportStore(kahaDbDir, artemisXml, false, false);
}
public static void exportKahaDbStore(final File kahaDbDir, final File artemisXml,
boolean compress) throws Exception {
+ Exporter.exportStore(kahaDbDir, artemisXml, false, compress);
+ }
- KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
- adapter.setDirectory(kahaDbDir);
- adapter.start();
+ public static void exportMultiKahaDbStore(final File kahaDbDir, final File artemisXml) throws Exception {
+ Exporter.exportStore(kahaDbDir, artemisXml, true, false);
+ }
+
+ public static void exportMultiKahaDbStore(final File kahaDbDir, final File artemisXml,
+ boolean compress) throws Exception {
+ Exporter.exportStore(kahaDbDir, artemisXml, true, compress);
+ }
+
+ private static void exportStore(final File kahaDbDir, final File artemisXml,
+ boolean multiKaha, boolean compress) throws Exception {
if (artemisXml.exists()) {
throw new IllegalStateException("File: " + artemisXml + " already exists");
@@ -65,11 +83,66 @@
final XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
final ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);
+
+ xmlMarshaller.appendJournalOpen();
+
+ if (multiKaha) {
+ appendMultiKahaDbStore(xmlMarshaller, getMultiKahaDbAdapter(kahaDbDir));
+ } else {
+ appendKahaDbStore(xmlMarshaller, getKahaDbAdapter(kahaDbDir));
+ }
+
+ xmlMarshaller.appendJournalClose(true);
+ }
+
+ long end = System.currentTimeMillis();
+
+ LOG.info("Total export time: " + (end - start) + " ms");
+ }
+
+
+ private static void appendMultiKahaDbStore(final ArtemisJournalMarshaller xmlMarshaller,
+ MultiKahaDBPersistenceAdapter multiAdapter) throws Exception {
+
+ try {
+ multiAdapter.start();
+
+ List<KahaDBExporter> dbExporters = multiAdapter.getAdapters().stream()
+ .filter(adapter -> adapter instanceof KahaDBPersistenceAdapter)
+ .map(adapter -> {
+ KahaDBPersistenceAdapter kahaAdapter = (KahaDBPersistenceAdapter) adapter;
+ return new KahaDBExporter(kahaAdapter,
+ new ArtemisXmlMetadataExporter(kahaAdapter.getStore(), xmlMarshaller),
+ new ArtemisXmlMessageRecoveryListener(kahaAdapter.getStore(), xmlMarshaller));
+ }).collect(Collectors.toList());
+
+ xmlMarshaller.appendBindingsElement();
+ for (KahaDBExporter dbExporter : dbExporters) {
+ dbExporter.exportMetadata();
+ }
+ xmlMarshaller.appendEndElement();
+
+ xmlMarshaller.appendMessagesElement();
+ for (KahaDBExporter dbExporter : dbExporters) {
+ dbExporter.exportQueues();
+ dbExporter.exportTopics();
+ }
+ xmlMarshaller.appendEndElement();
+ } finally {
+ multiAdapter.stop();
+ }
+ }
+
+ private static void appendKahaDbStore(final ArtemisJournalMarshaller xmlMarshaller,
+ KahaDBPersistenceAdapter adapter) throws Exception {
+
+ try {
+ adapter.start();
+
final KahaDBExporter dbExporter = new KahaDBExporter(adapter,
new ArtemisXmlMetadataExporter(adapter.getStore(), xmlMarshaller),
new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
- xmlMarshaller.appendJournalOpen();
xmlMarshaller.appendBindingsElement();
dbExporter.exportMetadata();
xmlMarshaller.appendEndElement();
@@ -77,12 +150,28 @@
dbExporter.exportQueues();
dbExporter.exportTopics();
xmlMarshaller.appendEndElement();
- xmlMarshaller.appendJournalClose(true);
} finally {
adapter.stop();
}
- long end = System.currentTimeMillis();
+ }
- LOG.info("Total export time: " + (end - start) + " ms");
+ private static KahaDBPersistenceAdapter getKahaDbAdapter(File dir) {
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setDirectory(dir);
+ return adapter;
+ }
+
+ private static MultiKahaDBPersistenceAdapter getMultiKahaDbAdapter(File dir) {
+ MultiKahaDBPersistenceAdapter adapter = new MultiKahaDBPersistenceAdapter();
+ adapter.setDirectory(dir);
+
+ KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+ kahaStore.setDirectory(dir);
+ FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+ filtered.setPersistenceAdapter(kahaStore);
+ filtered.setPerDestination(true);
+
+ adapter.setFilteredPersistenceAdapters(Lists.newArrayList(filtered));
+ return adapter;
}
}
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
index 8efdf42..b89e035 100644
--- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
@@ -65,8 +65,8 @@
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
-import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IdGenerator;
import org.junit.Rule;
@@ -75,13 +75,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ExporterTest {
+public abstract class ExporterTest {
static final Logger LOG = LoggerFactory.getLogger(ExporterTest.class);
@Rule
public TemporaryFolder storeFolder = new TemporaryFolder();
+ public abstract PersistenceAdapter getPersistenceAdapter(File dir);
+
+ public abstract void exportStore(final File kahaDbDir, final File xmlFile) throws Exception;
+
/**
* TODO Improve test when real exporting is done, for now this just
* tests that the recovery listener iterates over all the queue messages
@@ -93,9 +97,7 @@
File kahaDbDir = storeFolder.newFolder();
ActiveMQQueue queue = new ActiveMQQueue("test.queue");
- KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
- adapter.setJournalMaxFileLength(1024 * 1024);
- adapter.setDirectory(kahaDbDir);
+ PersistenceAdapter adapter = getPersistenceAdapter(kahaDbDir);
adapter.start();
MessageStore messageStore = adapter.createQueueMessageStore(queue);
messageStore.start();
@@ -154,7 +156,7 @@
adapter.stop();
File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
- Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
+ exportStore(kahaDbDir, xmlFile);
// printFile(xmlFile);
@@ -229,9 +231,7 @@
File kahaDbDir = storeFolder.newFolder();
ActiveMQTopic topic = new ActiveMQTopic("test.topic");
- KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
- adapter.setJournalMaxFileLength(1024 * 1024);
- adapter.setDirectory(kahaDbDir);
+ PersistenceAdapter adapter = getPersistenceAdapter(kahaDbDir);
adapter.start();
TopicMessageStore messageStore = adapter.createTopicMessageStore(topic);
messageStore.start();
@@ -264,9 +264,9 @@
adapter.stop();
File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
- Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
+ exportStore(kahaDbDir, xmlFile);
- // printFile(xmlFile);
+ printFile(xmlFile);
validate(xmlFile, 5);
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/KahaDbExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/KahaDbExporterTest.java
new file mode 100644
index 0000000..e9411af
--- /dev/null
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/KahaDbExporterTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import java.io.File;
+
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
+public class KahaDbExporterTest extends ExporterTest {
+
+ @Override
+ public PersistenceAdapter getPersistenceAdapter(File dir) {
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setJournalMaxFileLength(1024 * 1024);
+ adapter.setDirectory(dir);
+ return adapter;
+ }
+
+ @Override
+ public void exportStore(File kahaDbDir, File xmlFile) throws Exception {
+ Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
+ }
+}
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java
new file mode 100644
index 0000000..a006cef
--- /dev/null
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/MultiKahaDbExporterTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.cli.kahadb.exporter;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
+
+import com.google.common.collect.Lists;
+
+public class MultiKahaDbExporterTest extends ExporterTest {
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.cli.kahadb.exporter.ExporterTest#getPersistenceAdapter(java.io.File)
+ */
+ @Override
+ public PersistenceAdapter getPersistenceAdapter(File dir) {
+ MultiKahaDBPersistenceAdapter adapter = new MultiKahaDBPersistenceAdapter();
+ adapter.setJournalMaxFileLength(1024 * 1024);
+ adapter.setDirectory(dir);
+
+ KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
+ kahaStore.setDirectory(dir);
+ FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
+ filtered.setPersistenceAdapter(kahaStore);
+ filtered.setPerDestination(true);
+
+ adapter.setFilteredPersistenceAdapters(Lists.newArrayList(filtered));
+ return adapter;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.cli.kahadb.exporter.ExporterTest#exportStore(java.io.File, java.io.File)
+ */
+ @Override
+ public void exportStore(File kahaDbDir, File xmlFile) throws Exception {
+ Exporter.exportMultiKahaDbStore(kahaDbDir, xmlFile);
+
+ }
+
+}