| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.cli.kahadb.exporter; |
| |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileReader; |
| import java.io.IOException; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| import javax.jms.BytesMessage; |
| import javax.jms.Connection; |
| import javax.jms.MapMessage; |
| import javax.jms.MessageConsumer; |
| import javax.jms.ObjectMessage; |
| import javax.jms.Session; |
| import javax.jms.StreamMessage; |
| import javax.jms.TextMessage; |
| import javax.xml.bind.JAXBContext; |
| import javax.xml.bind.JAXBElement; |
| import javax.xml.bind.JAXBException; |
| import javax.xml.bind.Unmarshaller; |
| |
| import org.apache.activemq.artemis.api.core.TransportConfiguration; |
| import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter; |
| import org.apache.activemq.artemis.core.config.Configuration; |
| import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; |
| import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory; |
| import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory; |
| import org.apache.activemq.artemis.core.server.ActiveMQServer; |
| import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; |
| import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; |
| import org.apache.activemq.broker.ConnectionContext; |
| import org.apache.activemq.cli.schema.ActivemqJournalType; |
| import org.apache.activemq.cli.schema.ObjectFactory; |
| import org.apache.activemq.command.ActiveMQBytesMessage; |
| import org.apache.activemq.command.ActiveMQMapMessage; |
| import org.apache.activemq.command.ActiveMQObjectMessage; |
| import org.apache.activemq.command.ActiveMQQueue; |
| import org.apache.activemq.command.ActiveMQStreamMessage; |
| import org.apache.activemq.command.ActiveMQTextMessage; |
| import org.apache.activemq.command.ActiveMQTopic; |
| import org.apache.activemq.command.MessageAck; |
| import org.apache.activemq.command.MessageId; |
| import org.apache.activemq.command.SubscriptionInfo; |
| import org.apache.activemq.store.MessageStore; |
| import org.apache.activemq.store.TopicMessageStore; |
| import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; |
| import org.apache.activemq.util.ByteSequence; |
| import org.apache.activemq.util.IdGenerator; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ExporterTest { |
| |
| static final Logger LOG = LoggerFactory.getLogger(ExporterTest.class); |
| |
| @Rule |
| public TemporaryFolder storeFolder = new TemporaryFolder(); |
| |
| /** |
| * TODO Improve test when real exporting is done, for now this just |
| * tests that the recovery listener iterates over all the queue messages |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testExportQueues() throws Exception { |
| |
| File kahaDbDir = storeFolder.newFolder(); |
| ActiveMQQueue queue = new ActiveMQQueue("test.queue"); |
| KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); |
| adapter.setJournalMaxFileLength(1024 * 1024); |
| adapter.setDirectory(kahaDbDir); |
| adapter.start(); |
| MessageStore messageStore = adapter.createQueueMessageStore(queue); |
| messageStore.start(); |
| |
| IdGenerator id = new IdGenerator(); |
| ConnectionContext context = new ConnectionContext(); |
| for (int i = 0; i < 5; i++) { |
| ActiveMQTextMessage message = new ActiveMQTextMessage(); |
| message.setText("Test"); |
| message.setProperty("MyStringProperty", "abc"); |
| message.setProperty("MyIntegerProperty", 1); |
| message.setDestination(queue); |
| message.setMessageId(new MessageId(id.generateId() + ":1", i)); |
| messageStore.addMessage(context, message); |
| } |
| byte[] bytes = new byte[] {10, 11, 12}; |
| for (int i = 0; i < 3; i++) { |
| ActiveMQBytesMessage message = new ActiveMQBytesMessage(); |
| |
| message.setContent(new ByteSequence(bytes)); |
| message.setProperty("MyStringProperty", "abc"); |
| message.setProperty("MyByteProperty", (byte)10); |
| message.setDestination(queue); |
| message.setMessageId(new MessageId(id.generateId() + ":2", i)); |
| messageStore.addMessage(context, message); |
| } |
| |
| for (int i = 0; i < 3; i++) { |
| ActiveMQMapMessage message = new ActiveMQMapMessage(); |
| message.setObject("key", "value"); |
| message.setObject("key2", 10); |
| message.setProperty("MyStringProperty", "abc"); |
| message.setDestination(queue); |
| message.setMessageId(new MessageId(id.generateId() + ":3", i)); |
| messageStore.addMessage(context, message); |
| } |
| |
| Date date = new Date(); |
| for (int i = 0; i < 3; i++) { |
| ActiveMQObjectMessage message = new ActiveMQObjectMessage(); |
| message.setObject(date); |
| message.setDestination(queue); |
| message.setMessageId(new MessageId(id.generateId() + ":4", i)); |
| messageStore.addMessage(context, message); |
| } |
| |
| for (int i = 0; i < 3; i++) { |
| ActiveMQStreamMessage message = new ActiveMQStreamMessage(); |
| message.writeByte((byte)10); |
| message.storeContentAndClear(); |
| message.setDestination(queue); |
| message.setMessageId(new MessageId(id.generateId() + ":5", i)); |
| messageStore.addMessage(context, message); |
| } |
| |
| adapter.stop(); |
| |
| File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml"); |
| Exporter.exportKahaDbStore(kahaDbDir, xmlFile); |
| |
| // printFile(xmlFile); |
| |
| validate(xmlFile, 17); |
| |
| final ActiveMQServer artemisServer = buildArtemisBroker(); |
| artemisServer.start(); |
| |
| XmlDataImporter dataImporter = new XmlDataImporter(); |
| dataImporter.process(xmlFile.getAbsolutePath(), "localhost", 61400, false); |
| |
| ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400"); |
| |
| Connection connection = null; |
| try { |
| |
| connection = cf.createConnection(); |
| connection.start(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageConsumer messageConsumer = session.createConsumer(session.createQueue("test.queue")); |
| |
| for (int i = 0; i < 5; i++) { |
| TextMessage messageReceived = (TextMessage) messageConsumer.receive(1000); |
| assertNotNull(messageReceived); |
| assertEquals("abc", messageReceived.getStringProperty("MyStringProperty")); |
| assertEquals("Test", messageReceived.getText()); |
| } |
| |
| for (int i = 0; i < 3; i++) { |
| BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(1000); |
| assertNotNull(messageReceived); |
| assertEquals("abc", messageReceived.getStringProperty("MyStringProperty")); |
| assertEquals((byte)10, messageReceived.getByteProperty("MyByteProperty")); |
| byte[] result = new byte[3]; |
| messageReceived.readBytes(result); |
| assertArrayEquals(bytes, result); |
| } |
| |
| for (int i = 0; i < 3; i++) { |
| MapMessage messageReceived = (MapMessage) messageConsumer.receive(1000); |
| assertNotNull(messageReceived); |
| assertEquals("abc", messageReceived.getStringProperty("MyStringProperty")); |
| assertEquals("value", messageReceived.getObject("key")); |
| } |
| |
| for (int i = 0; i < 3; i++) { |
| ObjectMessage messageReceived = (ObjectMessage) messageConsumer.receive(1000); |
| assertNotNull(messageReceived); |
| assertEquals(date, messageReceived.getObject()); |
| } |
| |
| for (int i = 0; i < 3; i++) { |
| StreamMessage messageReceived = (StreamMessage) messageConsumer.receive(1000); |
| assertNotNull(messageReceived); |
| assertEquals((byte)10, messageReceived.readByte()); |
| } |
| |
| } finally { |
| if (connection != null) { |
| connection.close(); |
| } |
| cf.close(); |
| } |
| |
| artemisServer.stop(); |
| } |
| |
| @Test |
| public void testExportTopics() throws Exception { |
| |
| File kahaDbDir = storeFolder.newFolder(); |
| |
| ActiveMQTopic topic = new ActiveMQTopic("test.topic"); |
| KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); |
| adapter.setJournalMaxFileLength(1024 * 1024); |
| adapter.setDirectory(kahaDbDir); |
| adapter.start(); |
| TopicMessageStore messageStore = adapter.createTopicMessageStore(topic); |
| messageStore.start(); |
| |
| SubscriptionInfo sub1 = new SubscriptionInfo("clientId1", "sub1"); |
| SubscriptionInfo sub2 = new SubscriptionInfo("clientId1", "sub2"); |
| sub1.setDestination(topic); |
| messageStore.addSubscription(sub1, false); |
| messageStore.addSubscription(sub2, false); |
| |
| IdGenerator id = new IdGenerator(); |
| ConnectionContext context = new ConnectionContext(); |
| MessageId first = null; |
| for (int i = 0; i < 5; i++) { |
| ActiveMQTextMessage message = new ActiveMQTextMessage(); |
| message.setText("Test"); |
| message.setProperty("MyStringProperty", "abc"); |
| message.setProperty("MyIntegerProperty", 1); |
| message.setDestination(topic); |
| message.setMessageId(new MessageId(id.generateId() + ":1", i)); |
| messageStore.addMessage(context, message); |
| if (i == 0) { |
| first = message.getMessageId(); |
| } |
| } |
| |
| //ack for sub1 only |
| messageStore.acknowledge(context, "clientId1", "sub1", first, new MessageAck()); |
| |
| adapter.stop(); |
| |
| File xmlFile = new File(storeFolder.getRoot().getAbsoluteFile(), "outputXml.xml"); |
| Exporter.exportKahaDbStore(kahaDbDir, xmlFile); |
| |
| // printFile(xmlFile); |
| |
| validate(xmlFile, 5); |
| |
| final ActiveMQServer artemisServer = buildArtemisBroker(); |
| artemisServer.start(); |
| |
| XmlDataImporter dataImporter = new XmlDataImporter(); |
| dataImporter.process(xmlFile.getAbsolutePath(), "localhost", 61400, false); |
| |
| ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61400"); |
| |
| Connection connection = null; |
| try { |
| |
| connection = cf.createConnection(); |
| connection.setClientID("clientId1"); |
| connection.start(); |
| |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageConsumer messageConsumer = session.createSharedDurableConsumer( |
| session.createTopic("test.topic"), "sub1"); |
| MessageConsumer messageConsumer2 = session.createSharedDurableConsumer( |
| session.createTopic("test.topic"), "sub2"); |
| |
| for (int i = 0; i < 5; i++) { |
| TextMessage messageReceived1 = (TextMessage) messageConsumer.receive(1000); |
| if (i < 4) { |
| assertNotNull(messageReceived1); |
| } else { |
| assertNull(messageReceived1); |
| } |
| TextMessage messageReceived2 = (TextMessage) messageConsumer2.receive(1000); |
| assertNotNull(messageReceived2); |
| |
| assertEquals("abc", messageReceived2.getStringProperty("MyStringProperty")); |
| assertEquals("Test", messageReceived2.getText()); |
| } |
| |
| } finally { |
| if (connection != null) { |
| connection.close(); |
| } |
| cf.close(); |
| } |
| |
| artemisServer.stop(); |
| } |
| |
| public ActiveMQServer buildArtemisBroker() throws IOException { |
| Configuration configuration = new ConfigurationImpl(); |
| |
| configuration.setPersistenceEnabled(true); |
| configuration.setSecurityEnabled(false); |
| |
| Map<String, Object> connectionParams = new HashMap<String, Object>(); |
| connectionParams.put( |
| org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, 61400); |
| |
| configuration.setBindingsDirectory(storeFolder.newFolder().getAbsolutePath()); |
| configuration.setJournalDirectory(storeFolder.newFolder().getAbsolutePath()); |
| configuration.setLargeMessagesDirectory(storeFolder.newFolder().getAbsolutePath()); |
| configuration.setPagingDirectory(storeFolder.newFolder().getAbsolutePath()); |
| |
| configuration.addAcceptorConfiguration( |
| new TransportConfiguration(NettyAcceptorFactory.class.getName(), connectionParams)); |
| configuration.addConnectorConfiguration("connector", |
| new TransportConfiguration(NettyConnectorFactory.class.getName(), connectionParams)); |
| |
| |
| return new ActiveMQServerImpl(configuration); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void validate(File file, int count) throws JAXBException { |
| JAXBContext jaxbContext = JAXBContext.newInstance(ObjectFactory.class); |
| Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller(); |
| JAXBElement<ActivemqJournalType> read = (JAXBElement<ActivemqJournalType>) jaxbUnmarshaller.unmarshal(file); |
| assertEquals(count, read.getValue().getMessages().getMessage().size()); |
| } |
| |
| private void printFile(File file) throws IOException { |
| try (BufferedReader br = new BufferedReader(new FileReader(file))) { |
| String line = null; |
| while ((line = br.readLine()) != null) { |
| System.out.println(line); |
| } |
| } |
| } |
| |
| } |