AMQCLI-13 AMQCLI-14 - support --virtualTopicConsumerWildcards to export consumer queues to match openwire FQQN access
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java
index 2fad01d..84b5364 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/artemis/schema/ArtemisJournalMarshaller.java
@@ -29,7 +29,7 @@
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamWriter;
-import org.apache.activemq.artemis.cli.commands.tools.XmlDataConstants;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataConstants;
import org.apache.activemq.cli.schema.ActivemqJournalType;
import org.apache.activemq.cli.schema.AddressBindingType;
import org.apache.activemq.cli.schema.MessageType;
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/ExportConfiguration.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/ExportConfiguration.java
index 3f7a7bd..004f073 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/ExportConfiguration.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/ExportConfiguration.java
@@ -17,6 +17,14 @@
package org.apache.activemq.cli.kahadb.exporter;
import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.utils.CompositeAddress;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.filter.DestinationFilter;
+import org.apache.activemq.filter.DestinationPath;
public class ExportConfiguration {
@@ -34,6 +42,8 @@
private boolean overwrite;
+ private final Map<DestinationFilter, Integer> vtConsumerDestinationMatchers = new HashMap<>();
+
public File getSource() {
return source;
}
@@ -90,6 +100,49 @@
this.overwrite = overwrite;
}
+ public void setVirtualTopicConsumerWildcards(String virtualTopicConsumerWildcards) {
+ if (virtualTopicConsumerWildcards != null) {
+ for (String filter : virtualTopicConsumerWildcards.split(",")) {
+ String[] wildcardLimitPair = filter.split(";");
+ vtConsumerDestinationMatchers.put(DestinationFilter.parseFilter(new ActiveMQQueue(wildcardLimitPair[0])), Integer.valueOf(wildcardLimitPair[1]));
+ }
+ }
+ }
+
+ public ActiveMQDestination mapToDurableSubFQQN(ActiveMQDestination destination) {
+
+ if (vtConsumerDestinationMatchers.isEmpty()) {
+ return destination;
+ }
+
+ for (Map.Entry<DestinationFilter, Integer> candidate : vtConsumerDestinationMatchers.entrySet()) {
+ if (candidate.getKey().matches(destination)) {
+ // convert to matching FQQN
+ String[] paths = DestinationPath.getDestinationPaths(destination);
+ StringBuilder fqqn = new StringBuilder();
+ int filterPathTerminus = candidate.getValue();
+ // address - ie: topic
+ for (int i = filterPathTerminus; i < paths.length; i++) {
+ if (i > filterPathTerminus) {
+ fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
+ }
+ fqqn.append(paths[i]);
+ }
+ fqqn.append(CompositeAddress.SEPARATOR);
+ // consumer queue - the full vt queue
+ for (int i = 0; i < paths.length; i++) {
+ if (i > 0) {
+ fqqn.append(ActiveMQDestination.PATH_SEPERATOR);
+ }
+ fqqn.append(paths[i]);
+ }
+ // no need for a cache as this is called once per destination on metadata export
+ return new ActiveMQQueue(fqqn.toString());
+ }
+ }
+ return destination;
+ }
+
public static class ExportConfigurationBuilder {
private final ExportConfiguration config = new ExportConfiguration();
@@ -137,5 +190,9 @@
return config;
}
+ public ExportConfigurationBuilder setVirtualTopicConsumerWildcards(String virtualTopicConsumerWildcards) {
+ config.setVirtualTopicConsumerWildcards(virtualTopicConsumerWildcards);
+ return this;
+ }
}
}
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
index 9d7ba93..8ea7409 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/Exporter.java
@@ -88,6 +88,9 @@
@Option(name = "-f", type = OptionType.COMMAND, description = "Force XML output and overwrite existing file")
public boolean overwrite;
+ @Option(name = {"--vt", "--virtualTopicConsumerWildcards"}, type = OptionType.COMMAND, description = "Virtual Topic Consumer Pattern list")
+ public String virtualTopicConsumerWildcards;
+
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@@ -100,6 +103,7 @@
.setTarget(new File(target))
.setQueuePattern(queuePattern)
.setTopicPattern(topicPattern)
+ .setVirtualTopicConsumerWildcards(virtualTopicConsumerWildcards)
.setCompress(compress)
.setOverwrite(overwrite)
.build());
@@ -128,6 +132,7 @@
.setTarget(new File(target))
.setQueuePattern(queuePattern)
.setTopicPattern(topicPattern)
+ .setVirtualTopicConsumerWildcards(virtualTopicConsumerWildcards)
.setCompress(compress)
.setOverwrite(overwrite)
.build());
@@ -155,11 +160,9 @@
xmlMarshaller.appendJournalOpen();
if (config.isMultiKaha()) {
- appendMultiKahaDbStore(xmlMarshaller, getMultiKahaDbAdapter(config.getSource()),
- config.getQueuePattern(), config.getTopicPattern());
+ appendMultiKahaDbStore(xmlMarshaller, getMultiKahaDbAdapter(config.getSource()), config);
} else {
- appendKahaDbStore(xmlMarshaller, getKahaDbAdapter(config.getSource()),
- config.getQueuePattern(), config.getTopicPattern());
+ appendKahaDbStore(xmlMarshaller, getKahaDbAdapter(config.getSource()), config);
}
xmlMarshaller.appendJournalClose(true);
@@ -172,9 +175,7 @@
private static void appendMultiKahaDbStore(final ArtemisJournalMarshaller xmlMarshaller,
- final MultiKahaDBPersistenceAdapter multiAdapter, final String queuePattern,
- final String topicPattern) throws Exception {
-
+ final MultiKahaDBPersistenceAdapter multiAdapter, final ExportConfiguration config) throws Exception {
try {
multiAdapter.start();
@@ -183,7 +184,7 @@
.map(adapter -> {
KahaDBPersistenceAdapter kahaAdapter = (KahaDBPersistenceAdapter) adapter;
return new KahaDBExporter(kahaAdapter,
- new ArtemisXmlMetadataExporter(kahaAdapter.getStore(), xmlMarshaller),
+ new ArtemisXmlMetadataExporter(kahaAdapter.getStore(), xmlMarshaller, config),
new ArtemisXmlMessageRecoveryListener(kahaAdapter.getStore(), xmlMarshaller));
}).collect(Collectors.toList());
@@ -195,8 +196,8 @@
xmlMarshaller.appendMessagesElement();
for (KahaDBExporter dbExporter : dbExporters) {
- dbExporter.exportQueues(queuePattern);
- dbExporter.exportTopics(topicPattern);
+ dbExporter.exportQueues(config.getQueuePattern());
+ dbExporter.exportTopics(config.getTopicPattern());
}
xmlMarshaller.appendEndElement();
} finally {
@@ -205,21 +206,21 @@
}
private static void appendKahaDbStore(final ArtemisJournalMarshaller xmlMarshaller,
- final KahaDBPersistenceAdapter adapter, final String queuePattern, final String topicPattern) throws Exception {
+ final KahaDBPersistenceAdapter adapter, final ExportConfiguration config) throws Exception {
try {
adapter.start();
final KahaDBExporter dbExporter = new KahaDBExporter(adapter,
- new ArtemisXmlMetadataExporter(adapter.getStore(), xmlMarshaller),
+ new ArtemisXmlMetadataExporter(adapter.getStore(), xmlMarshaller, config),
new ArtemisXmlMessageRecoveryListener(adapter.getStore(), xmlMarshaller));
xmlMarshaller.appendBindingsElement();
dbExporter.exportMetadata();
xmlMarshaller.appendEndElement();
xmlMarshaller.appendMessagesElement();
- dbExporter.exportQueues(queuePattern);
- dbExporter.exportTopics(topicPattern);
+ dbExporter.exportQueues(config.getQueuePattern());
+ dbExporter.exportTopics(config.getTopicPattern());
xmlMarshaller.appendEndElement();
} finally {
adapter.stop();
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java
index d915ec9..f95603b 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/ArtemisXmlMetadataExporter.java
@@ -20,7 +20,9 @@
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.cli.artemis.schema.ArtemisJournalMarshaller;
+import org.apache.activemq.cli.kahadb.exporter.ExportConfiguration;
import org.apache.activemq.cli.kahadb.exporter.MessageStoreMetadataExporter;
import org.apache.activemq.cli.schema.QueueBindingType;
import org.apache.activemq.command.ActiveMQTopic;
@@ -31,16 +33,18 @@
private final KahaDBStore store;
private final ArtemisJournalMarshaller xmlMarshaller;
+ private final ExportConfiguration config;
/**
* @param xmlMarshaller
*/
public ArtemisXmlMetadataExporter(final KahaDBStore store,
- final ArtemisJournalMarshaller xmlMarshaller) {
+ final ArtemisJournalMarshaller xmlMarshaller, final ExportConfiguration config) {
super();
this.store = store;
this.xmlMarshaller = xmlMarshaller;
+ this.config = config;
}
@Override
@@ -49,16 +53,24 @@
.forEach(dest -> {
try {
if (dest.isQueue()) {
- xmlMarshaller.appendBinding(QueueBindingType.builder()
- .withName(dest.getPhysicalName())
- .withRoutingType(RoutingType.ANYCAST.toString())
- .withAddress(dest.getPhysicalName()).build());
+ org.apache.activemq.command.ActiveMQDestination mappedToFQQN = config.mapToDurableSubFQQN(dest);
+ if (dest != mappedToFQQN) {
+ xmlMarshaller.appendBinding(QueueBindingType.builder()
+ .withName(CompositeAddress.extractQueueName(mappedToFQQN.getPhysicalName()))
+ .withRoutingType(RoutingType.MULTICAST.toString())
+ .withAddress(CompositeAddress.extractAddressName(mappedToFQQN.getPhysicalName())).build());
+ } else {
+ xmlMarshaller.appendBinding(QueueBindingType.builder()
+ .withName(dest.getPhysicalName())
+ .withRoutingType(RoutingType.ANYCAST.toString())
+ .withAddress(dest.getPhysicalName()).build());
+ }
} else if (dest.isTopic()) {
for (SubscriptionInfo info :
store.createTopicMessageStore((ActiveMQTopic) dest).getAllSubscriptions()) {
xmlMarshaller.appendBinding(QueueBindingType.builder()
- .withName(ActiveMQDestination.createQueueNameForDurableSubscription(
- true, info.getClientId(), info.getSubcriptionName()))
+ .withName(ActiveMQDestination.createQueueNameForSubscription(
+ true, info.getClientId(), info.getSubcriptionName()).toString())
.withRoutingType(RoutingType.MULTICAST.toString())
.withAddress(dest.getPhysicalName()).build());
}
diff --git a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java
index af4d363..bfd6233 100644
--- a/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java
+++ b/activemq-kahadb-exporter/src/main/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverter.java
@@ -17,9 +17,11 @@
package org.apache.activemq.cli.kahadb.exporter.artemis;
import org.apache.activemq.artemis.api.core.ICoreMessage;
-import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporterUtil;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataExporterUtil;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
+import org.apache.activemq.cli.kahadb.exporter.ExportConfiguration;
import org.apache.activemq.cli.kahadb.exporter.OpenWireExportConverter;
import org.apache.activemq.cli.schema.BodyType;
import org.apache.activemq.cli.schema.MessageType;
@@ -38,7 +40,9 @@
*/
public class OpenWireCoreMessageTypeConverter implements OpenWireExportConverter<MessageType> {
- private final OpenWireMessageConverter converter = new OpenWireMessageConverter(new OpenWireFormat());
+ private final OpenWireMessageConverter converter = new OpenWireMessageConverter();
+ private final OpenWireFormat openWireFormat = new OpenWireFormat();
+ private final CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();
private final KahaDBStore store;
/**
@@ -58,7 +62,7 @@
*/
@Override
public MessageType convert(final Message message) throws Exception {
- final ICoreMessage serverMessage = (ICoreMessage) converter.inbound(message);
+ final ICoreMessage serverMessage = (ICoreMessage) converter.inbound(message, openWireFormat, coreMessageObjectPools);
final MessageType messageType = convertAttributes(serverMessage);
try {
@@ -103,8 +107,8 @@
KahaDBUtil.getUnackedSubscriptions(store, message).forEach(sub -> {
queuesBuilder.addQueue(QueueType.builder().withName(
- ActiveMQDestination.createQueueNameForDurableSubscription(
- true, sub.getClientId(), sub.getSubcriptionName())).build());
+ ActiveMQDestination.createQueueNameForSubscription(
+ true, sub.getClientId(), sub.getSubcriptionName()).toString()).build());
});
return queuesBuilder.build();
@@ -112,7 +116,7 @@
}
private BodyType convertBody(final ICoreMessage serverMessage) throws Exception {
- String value = XmlDataExporterUtil.encodeMessageBody(serverMessage);
+ String value = XmlDataExporterUtil.encodeMessageBodyBase64(serverMessage);
//requires CDATA
return BodyType.builder()
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
index 70a14e2..c76d962 100644
--- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/ExporterTest.java
@@ -33,7 +33,9 @@
import javax.jms.Connection;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
@@ -42,8 +44,10 @@
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataImporter;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
@@ -51,6 +55,7 @@
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.cli.kahadb.exporter.ExportConfiguration.ExportConfigurationBuilder;
import org.apache.activemq.cli.schema.ActivemqJournalType;
@@ -92,6 +97,129 @@
testExportQueues("test.>");
}
+ @Test
+ public void testExportVTQueueAsDurableSub() throws Exception {
+ File sourceDir = storeFolder.newFolder();
+ ActiveMQQueue queueA = new ActiveMQQueue("Consumer.A.VirtualTopic.T");
+ ActiveMQQueue queueB = new ActiveMQQueue("Consumer.B.VirtualTopic.T");
+
+ PersistenceAdapter adapter = getPersistenceAdapter(sourceDir);
+ adapter.start();
+ MessageStore messageStoreA = adapter.createQueueMessageStore(queueA);
+ MessageStore messageStoreB = adapter.createQueueMessageStore(queueB);
+ messageStoreA.start();
+ messageStoreB.start();
+
+ // publish messages
+ MessageId first = null;
+ IdGenerator id = new IdGenerator();
+ ConnectionContext context = new ConnectionContext();
+ for (int i = 0; i < 5; i++) {
+ ActiveMQTextMessage message = new ActiveMQTextMessage();
+ message.setText("Test");
+ message.setProperty("MyStringProperty", "abc");
+ message.setProperty("MyIntegerProperty", 1);
+ message.setMessageId(new MessageId(id.generateId() + ":1", i));
+
+ message.setDestination(queueA);
+ messageStoreA.addMessage(context, message);
+
+ message.setDestination(queueB);
+ messageStoreB.addMessage(context, message);
+
+ if (i == 0) {
+ first = message.getMessageId();
+ }
+ }
+
+ //ack for subA only
+ MessageAck ack = new MessageAck();
+ ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+ ack.setLastMessageId(first);
+ messageStoreA.removeMessage(context,ack);
+
+ adapter.stop();
+
+ File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml");
+ exportStore(ExportConfigurationBuilder.newBuilder().setSource(sourceDir).setTarget(xmlFile).setVirtualTopicConsumerWildcards("Consumer.*.>;2"));
+
+ printFile(xmlFile);
+
+ validate(xmlFile, 9);
+
+ final ActiveMQServer artemisServer = buildArtemisBroker();
+ artemisServer.start();
+ artemisServer.getManagementService().enableNotifications(false);
+
+ XmlDataImporter dataImporter = new XmlDataImporter();
+ dataImporter.process(xmlFile.getAbsolutePath(), "localhost", 61400, false);
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400");
+
+ Connection connection = null;
+ try {
+
+ connection = cf.createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Queue fqqnA = session.createQueue(CompositeAddress.toFullyQualified("VirtualTopic.T", "Consumer.A.VirtualTopic.T"));
+ MessageConsumer messageConsumerA = session.createConsumer(fqqnA);
+
+ Queue fqqnB = session.createQueue(CompositeAddress.toFullyQualified("VirtualTopic.T", "Consumer.B.VirtualTopic.T"));
+ MessageConsumer messageConsumerB = session.createConsumer(fqqnB);
+
+ for (int i = 0; i < 5; i++) {
+ if (i < 4) {
+ TextMessage messageReceived = (TextMessage) messageConsumerA.receive(1000);
+ assertNotNull(messageReceived);
+ assertEquals("abc", messageReceived.getStringProperty("MyStringProperty"));
+ assertEquals("Test", messageReceived.getText());
+
+ messageReceived = (TextMessage) messageConsumerB.receive(1000);
+ assertNotNull(messageReceived);
+ assertEquals("abc", messageReceived.getStringProperty("MyStringProperty"));
+ assertEquals("Test", messageReceived.getText());
+
+ } else {
+ // just subB gets this
+ TextMessage messageReceived = (TextMessage) messageConsumerA.receive(100);
+ assertNull(messageReceived);
+
+ messageReceived = (TextMessage) messageConsumerB.receive(1000);
+ assertNotNull(messageReceived);
+ assertEquals("abc", messageReceived.getStringProperty("MyStringProperty"));
+ assertEquals("Test", messageReceived.getText());
+ }
+ }
+
+ // verify durable topic sub semantics
+ // there is no auto create on core for a FQQN consumer so we need to configure before consumer creation!!
+ artemisServer.createQueue(new QueueConfiguration("Consumer.C.VirtualTopic.T").setAddress("VirtualTopic.T").setRoutingType(RoutingType.MULTICAST));
+ Queue fqqnC = session.createQueue(CompositeAddress.toFullyQualified("VirtualTopic.T", "Consumer.C.VirtualTopic.T"));
+ MessageConsumer messageConsumerC = session.createConsumer(fqqnC);
+
+ MessageProducer messageProducer = session.createProducer(session.createTopic("VirtualTopic.T"));
+ messageProducer.send(session.createTextMessage());
+
+ // consume from both subs
+ TextMessage messageReceived = (TextMessage) messageConsumerA.receive(1000);
+ assertNotNull(messageReceived);
+ messageReceived = (TextMessage) messageConsumerB.receive(1000);
+ assertNotNull(messageReceived);
+ messageReceived = (TextMessage) messageConsumerC.receive(1000);
+ assertNotNull(messageReceived);
+
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ cf.close();
+ }
+ artemisServer.stop();
+ }
+
/**
*
* @throws Exception
@@ -242,6 +370,7 @@
.setTopicPattern("empty.>")
.setSource(kahaDbDir)
.setTarget(xmlFile));
+ printFile(xmlFile);
validate(xmlFile, 0);
}
@@ -279,6 +408,7 @@
message.setText("Test");
message.setProperty("MyStringProperty", "abc");
message.setProperty("MyIntegerProperty", 1);
+ message.setProperty("MyIntegerPropertyId", i+1);
message.setDestination(topic);
message.setMessageId(new MessageId(id.generateId() + ":1", i));
messageStore.addMessage(context, message);
@@ -326,7 +456,7 @@
for (int i = 0; i < 5; i++) {
TextMessage messageReceived1 = (TextMessage) messageConsumer.receive(1000);
if (i < 4) {
- assertNotNull(messageReceived1);
+ assertNotNull("@" + i, messageReceived1);
} else {
assertNull(messageReceived1);
}
diff --git a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java
index 8b327a6..03f4943 100644
--- a/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java
+++ b/activemq-kahadb-exporter/src/test/java/org/apache/activemq/cli/kahadb/exporter/artemis/OpenWireCoreMessageTypeConverterTest.java
@@ -19,7 +19,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import org.apache.activemq.artemis.cli.commands.tools.XmlDataConstants;
+import org.apache.activemq.artemis.cli.commands.tools.xml.XmlDataConstants;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.cli.schema.MessageType;
@@ -131,7 +131,7 @@
MessageType messageType = c.convert(message);
assertEquals(XmlDataConstants.TEXT_TYPE_PRETTY, messageType.getType());
- assertEquals(ActiveMQDestination.createQueueNameForDurableSubscription(true, "clientId", "subName"),
+ assertEquals(ActiveMQDestination.createQueueNameForSubscription(true, "clientId", "subName").toString(),
messageType.getQueues().getQueue().get(0).getName());
}
}
diff --git a/pom.xml b/pom.xml
index 552e758..633c5f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -37,7 +37,7 @@
<maven.compiler.target>1.8</maven.compiler.target>
<activemq-version>5.14.4</activemq-version>
- <artemis-version>2.0.0</artemis-version>
+ <artemis-version>2.14.0-SNAPSHOT</artemis-version>
<slf4j-version>1.7.13</slf4j-version>
<log4j-version>1.2.17</log4j-version>