AMQCLI-7 Add support for filtering by dest
Support only exporting a subset of destinations by destination pattern
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
index dbe0114..8bd1b90 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/KahaDBExporter.java
@@ -17,12 +17,16 @@
package org.apache.activemq.cli.kahadb.exporter;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
import java.util.Set;
+import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
@@ -46,7 +50,6 @@
this.recoveryListener = recoveryListener;
}
-
@Override
public void exportMetadata() throws IOException {
metadataExporter.export();
@@ -54,17 +57,35 @@
@Override
public void exportQueues() throws IOException {
- exportDestinations(ActiveMQDestination.QUEUE_TYPE);
+ exportQueues(DestinationFilter.ANY_DESCENDENT);
}
@Override
public void exportTopics() throws IOException {
- exportDestinations(ActiveMQDestination.TOPIC_TYPE);
+ exportTopics(DestinationFilter.ANY_DESCENDENT);
}
- private void exportDestinations(byte destType) throws IOException {
- final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream().filter(
- dest -> dest.getDestinationType() == destType).collect(Collectors.toSet());
+ @Override
+ public void exportQueues(String pattern) throws IOException {
+ exportDestinations(new ActiveMQQueue(pattern));
+ }
+
+ @Override
+ public void exportTopics(String pattern) throws IOException {
+ exportDestinations(new ActiveMQTopic(pattern));
+ }
+
+ private void exportDestinations(ActiveMQDestination destPattern) throws IOException {
+
+ //Main destination filter
+ final DestinationFilter destFilter = DestinationFilter.parseFilter(destPattern);
+ //Secondary filter to catch a couple of extra edge cases
+ final Predicate<ActiveMQDestination> f = getExportDestinationFilter(destPattern);
+
+ final Set<ActiveMQDestination> destinations = adapter.getDestinations().stream()
+ .filter(dest -> destFilter.matches(dest))
+ .filter(f)
+ .collect(Collectors.toSet());
// loop through all queues and export them
for (final ActiveMQDestination destination : destinations) {
@@ -83,4 +104,40 @@
}
}
+ /**
+ * Do extra processing to filter out destinations. This is because the destination filter
+ * will match some destianations that we don't want. For example, "test.queue" will match
+ * as true for the pattern "test.queue.>" which is not correct.
+ * @param destPattern
+ * @return
+ */
+ private Predicate<ActiveMQDestination> getExportDestinationFilter(final ActiveMQDestination destPattern) {
+ //We need to check each composite destination individually
+ final List<ActiveMQDestination> nonComposite = destPattern.isComposite()
+ ? Arrays.asList(destPattern.getCompositeDestinations()) : Arrays.asList(destPattern);
+
+ return (e) -> {
+ boolean match = false;
+ for (ActiveMQDestination d : nonComposite) {
+ String destString = d.getPhysicalName();
+ //don't match a.b when using a.b.>
+ if (destPattern.isPattern() && destString.length() > 1 && destString.endsWith(DestinationFilter.ANY_DESCENDENT)) {
+ final String startsWithString = destString.substring(0, destString.length() - 2);
+ match = e.getPhysicalName().startsWith(startsWithString) && !e.getPhysicalName().equals(startsWithString);
+ //non wildcard should be an exact match
+ } else if (!destPattern.isPattern()) {
+ match = e.getPhysicalName().equals(destString);
+ } else {
+ match = true;
+ }
+ if (match) {
+ break;
+ }
+ }
+
+ return match;
+ };
+
+ }
+
}
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java
index b1217b4..efef3cc 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/MessageStoreExporter.java
@@ -24,5 +24,9 @@
public void exportQueues() throws IOException;
+ public void exportQueues(String pattern) throws IOException;
+
public void exportTopics() throws IOException;
+
+ public void exportTopics(String pattern) throws IOException;
}
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
index 6ed9681..8efdf42 100644
--- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
@@ -156,13 +156,7 @@
File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
- try (BufferedReader br = new BufferedReader(new FileReader(xmlFile))) {
- String line = null;
- while ((line = br.readLine()) != null) {
- System.out.println(line);
- }
- }
-
+ // printFile(xmlFile);
validate(xmlFile, 17);
@@ -272,14 +266,7 @@
File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
Exporter.exportKahaDbStore(kahaDbDir, xmlFile);
-
- try (BufferedReader br = new BufferedReader(new FileReader(xmlFile))) {
- String line = null;
- while ((line = br.readLine()) != null) {
- System.out.println(line);
- }
- }
-
+ // printFile(xmlFile);
validate(xmlFile, 5);
@@ -360,4 +347,13 @@
assertEquals(count, read.getValue().getMessages().getMessage().size());
}
+ private void printFile(File file) throws IOException {
+ try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+ String line = null;
+ while ((line = br.readLine()) != null) {
+ System.out.println(line);
+ }
+ }
+ }
+
}