AMQCLI-3 Add a utility method for exporting a store
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
index 7b8f17d..49d9ae7 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
@@ -16,6 +16,23 @@
*/
package org.apache.activemq.cli.kahadb.exporter;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.FileOutputStream;
+
+import javax.xml.stream.XMLOutputFactory;
+import javax.xml.stream.XMLStreamWriter;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
+import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener;
+import org.apache.activemq.cli.schema.QueueBindingType;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+
/**
* KahaDB Exporter
*/
@@ -24,4 +41,54 @@
public static void main(String[] args) {
}
+
+ public static void exportKahaDbStore(final File kahaDbDir, final File artemisXml) throws Exception {
+
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setDirectory(kahaDbDir);
+ adapter.start();
+
+ try(FileOutputStream fos = new FileOutputStream(artemisXml)) {
+ XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
+ ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);
+
+ xmlMarshaller.appendJournalOpen();
+ xmlMarshaller.appendBindingsElement();
+
+ adapter.getStore().getDestinations().stream()
+ .forEach(dest -> {
+ try {
+ if (dest.isQueue()) {
+ xmlMarshaller.appendBinding(QueueBindingType.builder()
+ .withName(dest.getPhysicalName())
+ .withRoutingType(RoutingType.ANYCAST.toString())
+ .withAddress(dest.getPhysicalName()).build());
+ } else if (dest.isTopic()) {
+ for (SubscriptionInfo info :
+ adapter.getStore().createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) {
+ xmlMarshaller.appendBinding(QueueBindingType.builder()
+ .withName(ActiveMQDestination.createQueueNameForDurableSubscription(
+ true, info.getClientId(), info.getSubcriptionName()))
+ .withRoutingType(RoutingType.MULTICAST.toString())
+ .withAddress(dest.getPhysicalName()).build());
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ });
+
+ xmlMarshaller.appendEndElement();
+ xmlMarshaller.appendMessagesElement();
+
+ KahaDBExporter dbExporter = new KahaDBExporter(adapter,
+ new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
+
+ dbExporter.exportQueues();
+ dbExporter.exportTopics();
+ xmlMarshaller.appendJournalClose(true);
+ } finally {
+ adapter.stop();
+ }
+ }
}
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
index b23ab7b..4a4d47a 100644
--- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
@@ -103,10 +103,11 @@
@Test
public void testExportQueues() throws Exception {
+ File kahaDbDir = storeFolder.newFolder();
ActiveMQQueue queue = new ActiveMQQueue("test.queue");
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setJournalMaxFileLength(1024 * 1024);
- adapter.setDirectory(storeFolder.newFolder());
+ adapter.setDirectory(kahaDbDir);
adapter.start();
MessageStore messageStore = adapter.createQueueMessageStore(queue);
messageStore.start();
@@ -162,32 +163,12 @@
messageStore.addMessage(context, message);
}
- messageStore.stop();
-
- File file = storeFolder.newFile();
- try(FileOutputStream fos = new FileOutputStream(file)) {
- XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
- ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);
-
- xmlMarshaller.appendJournalOpen();
- xmlMarshaller.appendBindingsElement();
- xmlMarshaller.appendBinding(QueueBindingType.builder()
- .withName("test.queue")
- .withRoutingType(RoutingType.ANYCAST.toString())
- .withAddress("test.queue").build());
- xmlMarshaller.appendEndElement();
- xmlMarshaller.appendMessagesElement();
-
- KahaDBExporter dbExporter = new KahaDBExporter(adapter,
- new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
-
- dbExporter.exportQueues();
- xmlMarshaller.appendJournalClose(true);
- }
-
adapter.stop();
- try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+ File xmlFile = storeFolder.newFile();
+ Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
+
+ try (BufferedReader br = new BufferedReader(new FileReader(xmlFile))) {
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
@@ -195,13 +176,13 @@
}
- validate(file, 17);
+ validate(xmlFile, 17);
final ActiveMQServer artemisServer = buildArtemisBroker();
artemisServer.start();
XmlDataImporter dataImporter = new XmlDataImporter();
- dataImporter.process(file.getAbsolutePath(), "localhost", 61400, false);
+ dataImporter.process(xmlFile.getAbsolutePath(), "localhost", 61400, false);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400");
@@ -263,10 +244,12 @@
@Test
public void testExportTopics() throws Exception {
+ File kahaDbDir = storeFolder.newFolder();
+
ActiveMQTopic topic = new ActiveMQTopic("test.topic");
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
adapter.setJournalMaxFileLength(1024 * 1024);
- adapter.setDirectory(storeFolder.newFolder());
+ adapter.setDirectory(kahaDbDir);
adapter.start();
TopicMessageStore messageStore = adapter.createTopicMessageStore(topic);
messageStore.start();
@@ -296,49 +279,13 @@
//ack for sub1 only
messageStore.acknowledge(context, "clientId1", "sub1", first, new MessageAck());
- messageStore.stop();
-
- // String queueName = ActiveMQDestination.createQueueNameForDurableSubscription(true, "clientId1", "sub1");
-
- File file = storeFolder.newFile();
- try(FileOutputStream fos = new FileOutputStream(file)) {
- XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
- ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);
-
- xmlMarshaller.appendJournalOpen();
- xmlMarshaller.appendBindingsElement();
-
- adapter.getStore().getDestinations().stream()
- .filter(dest -> dest.isTopic()).forEach(dest -> {
-
- try {
- for (SubscriptionInfo info :
- adapter.getStore().createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) {
- xmlMarshaller.appendBinding(QueueBindingType.builder()
- .withName(ActiveMQDestination.createQueueNameForDurableSubscription(
- true, info.getClientId(), info.getSubcriptionName()))
- .withRoutingType(RoutingType.MULTICAST.toString())
- .withAddress(dest.getPhysicalName()).build());
- }
-
- } catch (Exception e) {
- fail(e.getMessage());
- }
- });
-
- xmlMarshaller.appendEndElement();
- xmlMarshaller.appendMessagesElement();
-
- KahaDBExporter dbExporter = new KahaDBExporter(adapter,
- new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
-
- dbExporter.exportTopics();
- xmlMarshaller.appendJournalClose(true);
- }
-
adapter.stop();
- try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+ File xmlFile = storeFolder.newFile();
+ Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
+
+
+ try (BufferedReader br = new BufferedReader(new FileReader(xmlFile))) {
String line = null;
while ((line = br.readLine()) != null) {
System.out.println(line);
@@ -346,13 +293,13 @@
}
- validate(file, 5);
+ validate(xmlFile, 5);
final ActiveMQServer artemisServer = buildArtemisBroker();
artemisServer.start();
XmlDataImporter dataImporter = new XmlDataImporter();
- dataImporter.process(file.getAbsolutePath(), "localhost", 61400, false);
+ dataImporter.process(xmlFile.getAbsolutePath(), "localhost", 61400, false);
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400");