AMQCLI-5 - Add support for exporting Topics

https://issues.apache.org/jira/browse/AMQCLI-5

Added basic support for exporting topics.  This will still need a bit of
polishing
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
index c178a8c..9c12644 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
@@ -23,7 +23,6 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.TopicMessageStore;
@@ -72,20 +71,12 @@
             final ActiveMQTopic topic = (ActiveMQTopic) destination;
             final TopicMessageStore messageStore = adapter.createTopicMessageStore(topic);
 
-            //recover subscriptions
-            //TODO: This will most likely run into the same message more than once if there is
-            //more than one durable sub on a topic so we should look at optimizing this
-            //Ideally we'd just recover all the messages once and then ask KahaDB which subscriptions
-            //have not acked the message.  This will probably require a new hook into KahaDB
-//            for (final SubscriptionInfo subscriptionInfo : messageStore.getAllSubscriptions()) {
-//
-//                try {
-//                    messageStore.recoverSubscription(subscriptionInfo.getClientId(),
-//                            subscriptionInfo.getSubscriptionName(), recoveryListener);
-//                } catch (Exception e) {
-//                    IOExceptionSupport.create(e);
-//                }
-//            }
+            //recover topic
+            try {
+                messageStore.recover(recoveryListener);
+            } catch (Exception e) {
+                IOExceptionSupport.create(e);
+            }
         }
     }
 }
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
index c2d04a2..93bd439 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
@@ -21,6 +21,7 @@
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,15 +32,16 @@
     static final Logger LOG = LoggerFactory.getLogger(ArtemisXmlMessageRecoveryListener.class);
 
     private final ArtemisJournalMarshaller xmlMarshaller;
-    private final OpenWireMessageTypeConverter converter = new OpenWireMessageTypeConverter();
-
+    private final OpenWireMessageTypeConverter converter;
 
     /**
      * @param file
      */
-    public ArtemisXmlMessageRecoveryListener(final ArtemisJournalMarshaller xmlMarshaller) {
+    public ArtemisXmlMessageRecoveryListener(final KahaDBStore store,
+            final ArtemisJournalMarshaller xmlMarshaller) {
         super();
         this.xmlMarshaller = xmlMarshaller;
+        this.converter = new OpenWireMessageTypeConverter(store);
     }
 
 
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
index 259decc..c921b48 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
@@ -16,11 +16,10 @@
  */
 package org.apache.activemq.cli.kahadb.exporter.artemis;
 
-import javax.jms.JMSException;
-
 import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporterUtil;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.cli.kahadb.exporter.OpenWireExportConverter;
 import org.apache.activemq.cli.schema.BodyType;
 import org.apache.activemq.cli.schema.MessageType;
@@ -30,10 +29,22 @@
 import org.apache.activemq.cli.schema.QueuesType;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.store.kahadb.KahaDBUtil;
 
 public class OpenWireMessageTypeConverter implements OpenWireExportConverter<MessageType> {
 
-    final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat());
+    private final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat());
+    private final KahaDBStore store;
+
+
+    /**
+     * @param store
+     */
+    public OpenWireMessageTypeConverter(KahaDBStore store) {
+        super();
+        this.store = store;
+    }
 
     /* (non-Javadoc)
      * @see org.apache.activemq.cli.kahadb.exporter.MessageConverter#convert(org.apache.activemq.Message)
@@ -66,11 +77,23 @@
         return messageType;
     }
 
-    private QueuesType convertQueue(final Message message) throws JMSException {
-        return QueuesType.builder()
-                .withQueue(QueueType.builder()
-                        .withName(message.getDestination().getPhysicalName()).build())
-            .build();
+    private QueuesType convertQueue(final Message message) throws Exception {
+        if (message.getDestination().isQueue()) {
+            return QueuesType.builder()
+                    .withQueue(QueueType.builder()
+                            .withName(message.getDestination().getPhysicalName()).build())
+                .build();
+        } else {
+            final QueuesType.Builder<Void> queuesBuilder = QueuesType.builder();
+
+            KahaDBUtil.getUnackedSubscriptions(store, message).forEach(sub -> {
+                queuesBuilder.addQueue(QueueType.builder().withName(
+                        ActiveMQDestination.createQueueNameForDurableSubscription(
+                        true, sub.getClientId(), sub.getSubcriptionName())).build());
+            });
+
+            return queuesBuilder.build();
+        }
     }
 
     private BodyType convertBody(final ServerMessage serverMessage) throws Exception {
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java
new file mode 100644
index 0000000..40ea71d
--- /dev/null
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.LastAck;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+
+public class KahaDBUtil {
+
+
+    /**
+     * Return subscriptions which have not acked this message
+     *
+     * @param store
+     * @param message
+     * @return
+     * @throws Exception
+     */
+    public static List<SubscriptionInfo> getUnackedSubscriptions(KahaDBStore store, Message message)
+            throws Exception {
+
+        final List<SubscriptionInfo> matching = new ArrayList<>();
+
+        if (!message.getDestination().isTopic()) {
+            return matching;
+        }
+
+        ActiveMQTopic topic = (ActiveMQTopic) message.getDestination();
+        String messageId = message.getMessageId().toString();
+        TopicMessageStore messageStore =  store.createTopicMessageStore(topic);
+
+        store.indexLock.writeLock().lock();
+
+        final SubscriptionInfo[] infos = messageStore.getAllSubscriptions();
+
+        try {
+            store.pageFile.tx().execute(new Transaction.Closure<Exception>() {
+                @Override
+                public void execute(Transaction tx) throws Exception {
+                    StoredDestination sd = store.getStoredDestination(store.convert(topic), tx);
+
+                    if (sd != null) {
+                        Long position = sd.messageIdIndex.get(tx, messageId);
+
+                        for (SubscriptionInfo info : infos) {
+                            LastAck cursorPos = store.getLastAck(tx, sd,
+                                    store.subscriptionKey(info.getClientId(), info.getSubcriptionName()));
+                            if (cursorPos.lastAckedSequence < position) {
+                                matching.add(info);
+                            }
+                        }
+                    }
+                }
+            });
+        } finally {
+            store.indexLock.writeLock().unlock();
+        }
+
+        return matching;
+    }
+}
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
index 73bcecd..b23ab7b 100644
--- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
@@ -19,6 +19,8 @@
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -28,6 +30,7 @@
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -56,6 +59,7 @@
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
 import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener;
@@ -68,8 +72,12 @@
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQStreamMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IdGenerator;
@@ -165,17 +173,19 @@
             xmlMarshaller.appendBindingsElement();
             xmlMarshaller.appendBinding(QueueBindingType.builder()
                     .withName("test.queue")
+                    .withRoutingType(RoutingType.ANYCAST.toString())
                     .withAddress("test.queue").build());
             xmlMarshaller.appendEndElement();
             xmlMarshaller.appendMessagesElement();
 
             KahaDBExporter dbExporter = new KahaDBExporter(adapter,
-                    new ArtemisXmlMessageRecoveryListener(xmlMarshaller));
+                    new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
 
             dbExporter.exportQueues();
             xmlMarshaller.appendJournalClose(true);
         }
 
+        adapter.stop();
 
         try (BufferedReader br = new BufferedReader(new FileReader(file))) {
             String line = null;
@@ -250,6 +260,139 @@
         artemisServer.stop();
     }
 
+    @Test
+    public void testExportTopics() throws Exception {
+
+        ActiveMQTopic topic = new ActiveMQTopic("test.topic");
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setJournalMaxFileLength(1024 * 1024);
+        adapter.setDirectory(storeFolder.newFolder());
+        adapter.start();
+        TopicMessageStore messageStore = adapter.createTopicMessageStore(topic);
+        messageStore.start();
+
+        SubscriptionInfo sub1 = new SubscriptionInfo("clientId1", "sub1");
+        SubscriptionInfo sub2 = new SubscriptionInfo("clientId1", "sub2");
+        sub1.setDestination(topic);
+        messageStore.addSubscription(sub1, false);
+        messageStore.addSubscription(sub2, false);
+
+        IdGenerator id = new IdGenerator();
+        ConnectionContext context = new ConnectionContext();
+        MessageId first = null;
+        for (int i = 0; i < 5; i++) {
+            ActiveMQTextMessage message = new ActiveMQTextMessage();
+            message.setText("Test");
+            message.setProperty("MyStringProperty", "abc");
+            message.setProperty("MyIntegerProperty", 1);
+            message.setDestination(topic);
+            message.setMessageId(new MessageId(id.generateId() + ":1", i));
+            messageStore.addMessage(context, message);
+            if (i == 0) {
+                first = message.getMessageId();
+            }
+        }
+
+        //ack for sub1 only
+        messageStore.acknowledge(context, "clientId1", "sub1", first, new MessageAck());
+
+        messageStore.stop();
+
+      //  String queueName = ActiveMQDestination.createQueueNameForDurableSubscription(true, "clientId1", "sub1");
+
+        File file = storeFolder.newFile();
+        try(FileOutputStream fos = new FileOutputStream(file)) {
+            XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
+            ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);
+
+            xmlMarshaller.appendJournalOpen();
+            xmlMarshaller.appendBindingsElement();
+
+            adapter.getStore().getDestinations().stream()
+                    .filter(dest -> dest.isTopic()).forEach(dest -> {
+
+                        try {
+                            for (SubscriptionInfo info :
+                                adapter.getStore().createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) {
+                                xmlMarshaller.appendBinding(QueueBindingType.builder()
+                                        .withName(ActiveMQDestination.createQueueNameForDurableSubscription(
+                                                true, info.getClientId(), info.getSubcriptionName()))
+                                        .withRoutingType(RoutingType.MULTICAST.toString())
+                                        .withAddress(dest.getPhysicalName()).build());
+                            }
+
+                        } catch (Exception e) {
+                            fail(e.getMessage());
+                        }
+                    });
+
+            xmlMarshaller.appendEndElement();
+            xmlMarshaller.appendMessagesElement();
+
+            KahaDBExporter dbExporter = new KahaDBExporter(adapter,
+                    new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
+
+            dbExporter.exportTopics();
+            xmlMarshaller.appendJournalClose(true);
+        }
+
+        adapter.stop();
+
+        try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+            String line = null;
+            while ((line = br.readLine()) != null) {
+                System.out.println(line);
+            }
+         }
+
+
+        validate(file, 5);
+
+        final ActiveMQServer artemisServer = buildArtemisBroker();
+        artemisServer.start();
+
+        XmlDataImporter dataImporter = new XmlDataImporter();
+        dataImporter.process(file.getAbsolutePath(), "localhost", 61400, false);
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400");
+
+        Connection connection = null;
+        try {
+
+            connection = cf.createConnection();
+            connection.setClientID("clientId1");
+            connection.start();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            MessageConsumer messageConsumer = session.createSharedDurableConsumer(
+                    session.createTopic("test.topic"), "sub1");
+            MessageConsumer messageConsumer2 = session.createSharedDurableConsumer(
+                    session.createTopic("test.topic"), "sub2");
+
+            for (int i = 0; i < 5; i++) {
+                TextMessage messageReceived1 = (TextMessage) messageConsumer.receive(1000);
+                if (i < 4) {
+                    assertNotNull(messageReceived1);
+                } else {
+                    assertNull(messageReceived1);
+                }
+                TextMessage messageReceived2 = (TextMessage) messageConsumer2.receive(1000);
+                assertNotNull(messageReceived2);
+
+                assertEquals("abc", messageReceived2.getStringProperty("MyStringProperty"));
+                assertEquals("Test", messageReceived2.getText());
+            }
+
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+            cf.close();
+        }
+
+        artemisServer.stop();
+    }
+
     public ActiveMQServer buildArtemisBroker() throws IOException {
         Configuration configuration = new ConfigurationImpl();
 
@@ -270,14 +413,6 @@
         configuration.addConnectorConfiguration("connector",
                 new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
 
-        configuration.addAddressConfiguration(new CoreAddressConfiguration()
-                .setName("test.queue")
-                .addRoutingType(RoutingType.ANYCAST)
-                .addQueueConfiguration(new CoreQueueConfiguration()
-                        .setAddress("test.queue")
-                        .setName("test.queue")
-                        .setRoutingType(RoutingType.ANYCAST))
-                );
 
        return new ActiveMQServerImpl(configuration);
     }