AMQCLI-5 - Add support for exporting Topics
https://issues.apache.org/jira/browse/AMQCLI-5
Added basic support for exporting topics. This will still need a bit of
polishing
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
index c178a8c..9c12644 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
@@ -23,7 +23,6 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
@@ -72,20 +71,12 @@
final ActiveMQTopic topic = (ActiveMQTopic) destination;
final TopicMessageStore messageStore = adapter.createTopicMessageStore(topic);
- //recover subscriptions
- //TODO: This will most likely run into the same message more than once if there is
- //more than one durable sub on a topic so we should look at optimizing this
- //Ideally we'd just recover all the messages once and then ask KahaDB which subscriptions
- //have not acked the message. This will probably require a new hook into KahaDB
-// for (final SubscriptionInfo subscriptionInfo : messageStore.getAllSubscriptions()) {
-//
-// try {
-// messageStore.recoverSubscription(subscriptionInfo.getClientId(),
-// subscriptionInfo.getSubscriptionName(), recoveryListener);
-// } catch (Exception e) {
-// IOExceptionSupport.create(e);
-// }
-// }
+ //recover topic
+ try {
+ messageStore.recover(recoveryListener);
+ } catch (Exception e) {
+ IOExceptionSupport.create(e);
+ }
}
}
}
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
index c2d04a2..93bd439 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMessageRecoveryListener.java
@@ -21,6 +21,7 @@
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.kahadb.KahaDBStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,15 +32,16 @@
static final Logger LOG = LoggerFactory.getLogger(ArtemisXmlMessageRecoveryListener.class);
private final ArtemisJournalMarshaller xmlMarshaller;
- private final OpenWireMessageTypeConverter converter = new OpenWireMessageTypeConverter();
-
+ private final OpenWireMessageTypeConverter converter;
/**
* @param file
*/
- public ArtemisXmlMessageRecoveryListener(final ArtemisJournalMarshaller xmlMarshaller) {
+ public ArtemisXmlMessageRecoveryListener(final KahaDBStore store,
+ final ArtemisJournalMarshaller xmlMarshaller) {
super();
this.xmlMarshaller = xmlMarshaller;
+ this.converter = new OpenWireMessageTypeConverter(store);
}
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
index 259decc..c921b48 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireMessageTypeConverter.java
@@ -16,11 +16,10 @@
*/
package org.apache.activemq.cli.kahadb.exporter.artemis;
-import javax.jms.JMSException;
-
import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporterUtil;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.cli.kahadb.exporter.OpenWireExportConverter;
import org.apache.activemq.cli.schema.BodyType;
import org.apache.activemq.cli.schema.MessageType;
@@ -30,10 +29,22 @@
import org.apache.activemq.cli.schema.QueuesType;
import org.apache.activemq.command.Message;
import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.apache.activemq.store.kahadb.KahaDBUtil;
public class OpenWireMessageTypeConverter implements OpenWireExportConverter<MessageType> {
- final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat());
+ private final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat());
+ private final KahaDBStore store;
+
+
+ /**
+ * @param store
+ */
+ public OpenWireMessageTypeConverter(KahaDBStore store) {
+ super();
+ this.store = store;
+ }
/* (non-Javadoc)
* @see org.apache.activemq.cli.kahadb.exporter.MessageConverter#convert(org.apache.activemq.Message)
@@ -66,11 +77,23 @@
return messageType;
}
- private QueuesType convertQueue(final Message message) throws JMSException {
- return QueuesType.builder()
- .withQueue(QueueType.builder()
- .withName(message.getDestination().getPhysicalName()).build())
- .build();
+ private QueuesType convertQueue(final Message message) throws Exception {
+ if (message.getDestination().isQueue()) {
+ return QueuesType.builder()
+ .withQueue(QueueType.builder()
+ .withName(message.getDestination().getPhysicalName()).build())
+ .build();
+ } else {
+ final QueuesType.Builder<Void> queuesBuilder = QueuesType.builder();
+
+ KahaDBUtil.getUnackedSubscriptions(store, message).forEach(sub -> {
+ queuesBuilder.addQueue(QueueType.builder().withName(
+ ActiveMQDestination.createQueueNameForDurableSubscription(
+ true, sub.getClientId(), sub.getSubcriptionName())).build());
+ });
+
+ return queuesBuilder.build();
+ }
}
private BodyType convertBody(final ServerMessage serverMessage) throws Exception {
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java
new file mode 100644
index 0000000..40ea71d
--- /dev/null
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.SubscriptionInfo;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.LastAck;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+
+public class KahaDBUtil {
+
+
+ /**
+ * Return subscriptions which have not acked this message
+ *
+ * @param store
+ * @param message
+ * @return
+ * @throws Exception
+ */
+ public static List<SubscriptionInfo> getUnackedSubscriptions(KahaDBStore store, Message message)
+ throws Exception {
+
+ final List<SubscriptionInfo> matching = new ArrayList<>();
+
+ if (!message.getDestination().isTopic()) {
+ return matching;
+ }
+
+ ActiveMQTopic topic = (ActiveMQTopic) message.getDestination();
+ String messageId = message.getMessageId().toString();
+ TopicMessageStore messageStore = store.createTopicMessageStore(topic);
+
+ store.indexLock.writeLock().lock();
+
+ final SubscriptionInfo[] infos = messageStore.getAllSubscriptions();
+
+ try {
+ store.pageFile.tx().execute(new Transaction.Closure<Exception>() {
+ @Override
+ public void execute(Transaction tx) throws Exception {
+ StoredDestination sd = store.getStoredDestination(store.convert(topic), tx);
+
+ if (sd != null) {
+ Long position = sd.messageIdIndex.get(tx, messageId);
+
+ for (SubscriptionInfo info : infos) {
+ LastAck cursorPos = store.getLastAck(tx, sd,
+ store.subscriptionKey(info.getClientId(), info.getSubcriptionName()));
+ if (cursorPos.lastAckedSequence < position) {
+ matching.add(info);
+ }
+ }
+ }
+ }
+ });
+ } finally {
+ store.indexLock.writeLock().unlock();
+ }
+
+ return matching;
+ }
+}
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
index 73bcecd..b23ab7b 100644
--- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
@@ -19,6 +19,8 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.File;
@@ -28,6 +30,7 @@
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -56,6 +59,7 @@
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
import org.apache.activemq.cli.kahadb.exporter.artemis.ArtemisXmlMessageRecoveryListener;
@@ -68,8 +72,12 @@
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IdGenerator;
@@ -165,17 +173,19 @@
xmlMarshaller.appendBindingsElement();
xmlMarshaller.appendBinding(QueueBindingType.builder()
.withName("test.queue")
+ .withRoutingType(RoutingType.ANYCAST.toString())
.withAddress("test.queue").build());
xmlMarshaller.appendEndElement();
xmlMarshaller.appendMessagesElement();
KahaDBExporter dbExporter = new KahaDBExporter(adapter,
- new ArtemisXmlMessageRecoveryListener(xmlMarshaller));
+ new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
dbExporter.exportQueues();
xmlMarshaller.appendJournalClose(true);
}
+ adapter.stop();
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
String line = null;
@@ -250,6 +260,139 @@
artemisServer.stop();
}
+ @Test
+ public void testExportTopics() throws Exception {
+
+ ActiveMQTopic topic = new ActiveMQTopic("test.topic");
+ KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+ adapter.setJournalMaxFileLength(1024 * 1024);
+ adapter.setDirectory(storeFolder.newFolder());
+ adapter.start();
+ TopicMessageStore messageStore = adapter.createTopicMessageStore(topic);
+ messageStore.start();
+
+ SubscriptionInfo sub1 = new SubscriptionInfo("clientId1", "sub1");
+ SubscriptionInfo sub2 = new SubscriptionInfo("clientId1", "sub2");
+ sub1.setDestination(topic);
+ messageStore.addSubscription(sub1, false);
+ messageStore.addSubscription(sub2, false);
+
+ IdGenerator id = new IdGenerator();
+ ConnectionContext context = new ConnectionContext();
+ MessageId first = null;
+ for (int i = 0; i < 5; i++) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText("Test");
+ message.setProperty("MyStringProperty", "abc");
+ message.setProperty("MyIntegerProperty", 1);
+ message.setDestination(topic);
+ message.setMessageId(new MessageId(id.generateId() + ":1", i));
+ messageStore.addMessage(context, message);
+ if (i == 0) {
+ first = message.getMessageId();
+ }
+ }
+
+ //ack for sub1 only
+ messageStore.acknowledge(context, "clientId1", "sub1", first, new MessageAck());
+
+ messageStore.stop();
+
+ // String queueName = ActiveMQDestination.createQueueNameForDurableSubscription(true, "clientId1", "sub1");
+
+ File file = storeFolder.newFile();
+ try(FileOutputStream fos = new FileOutputStream(file)) {
+ XMLStreamWriter xmlWriter = XMLOutputFactory.newFactory().createXMLStreamWriter(fos);
+ ArtemisJournalMarshaller xmlMarshaller = new ArtemisJournalMarshaller(xmlWriter);
+
+ xmlMarshaller.appendJournalOpen();
+ xmlMarshaller.appendBindingsElement();
+
+ adapter.getStore().getDestinations().stream()
+ .filter(dest -> dest.isTopic()).forEach(dest -> {
+
+ try {
+ for (SubscriptionInfo info :
+ adapter.getStore().createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) {
+ xmlMarshaller.appendBinding(QueueBindingType.builder()
+ .withName(ActiveMQDestination.createQueueNameForDurableSubscription(
+ true, info.getClientId(), info.getSubcriptionName()))
+ .withRoutingType(RoutingType.MULTICAST.toString())
+ .withAddress(dest.getPhysicalName()).build());
+ }
+
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ });
+
+ xmlMarshaller.appendEndElement();
+ xmlMarshaller.appendMessagesElement();
+
+ KahaDBExporter dbExporter = new KahaDBExporter(adapter,
+ new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
+
+ dbExporter.exportTopics();
+ xmlMarshaller.appendJournalClose(true);
+ }
+
+ adapter.stop();
+
+ try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ System.out.println(line);
+ }
+ }
+
+
+ validate(file, 5);
+
+ final ActiveMQServer artemisServer = buildArtemisBroker();
+ artemisServer.start();
+
+ XmlDataImporter dataImporter = new XmlDataImporter();
+ dataImporter.process(file.getAbsolutePath(), "localhost", 61400, false);
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400");
+
+ Connection connection = null;
+ try {
+
+ connection = cf.createConnection();
+ connection.setClientID("clientId1");
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer messageConsumer = session.createSharedDurableConsumer(
+ session.createTopic("test.topic"), "sub1");
+ MessageConsumer messageConsumer2 = session.createSharedDurableConsumer(
+ session.createTopic("test.topic"), "sub2");
+
+ for (int i = 0; i < 5; i++) {
+ TextMessage messageReceived1 = (TextMessage) messageConsumer.receive(1000);
+ if (i < 4) {
+ assertNotNull(messageReceived1);
+ } else {
+ assertNull(messageReceived1);
+ }
+ TextMessage messageReceived2 = (TextMessage) messageConsumer2.receive(1000);
+ assertNotNull(messageReceived2);
+
+ assertEquals("abc", messageReceived2.getStringProperty("MyStringProperty"));
+ assertEquals("Test", messageReceived2.getText());
+ }
+
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ cf.close();
+ }
+
+ artemisServer.stop();
+ }
+
public ActiveMQServer buildArtemisBroker() throws IOException {
Configuration configuration = new ConfigurationImpl();
@@ -270,14 +413,6 @@
configuration.addConnectorConfiguration("connector",
new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams));
- configuration.addAddressConfiguration(new CoreAddressConfiguration()
- .setName("test.queue")
- .addRoutingType(RoutingType.ANYCAST)
- .addQueueConfiguration(new CoreQueueConfiguration()
- .setAddress("test.queue")
- .setName("test.queue")
- .setRoutingType(RoutingType.ANYCAST))
- );
return new ActiveMQServerImpl(configuration);
}