| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.artemis.tests.integration.amqp; |
| |
| import javax.jms.BytesMessage; |
| import javax.jms.Connection; |
| import javax.jms.ConnectionFactory; |
| import javax.jms.DeliveryMode; |
| import javax.jms.MessageProducer; |
| import javax.jms.Session; |
| import javax.management.MBeanServer; |
| import javax.management.MBeanServerFactory; |
| import java.net.URI; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.activemq.ActiveMQConnectionFactory; |
| import org.apache.activemq.artemis.api.core.QueueConfiguration; |
| import org.apache.activemq.artemis.api.core.RoutingType; |
| import org.apache.activemq.artemis.api.core.SimpleString; |
| import org.apache.activemq.artemis.api.core.TransportConfiguration; |
| import org.apache.activemq.artemis.api.core.client.ActiveMQClient; |
| import org.apache.activemq.artemis.api.core.client.ClientMessage; |
| import org.apache.activemq.artemis.api.core.client.ClientProducer; |
| import org.apache.activemq.artemis.api.core.client.ClientSession; |
| import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; |
| import org.apache.activemq.artemis.api.core.client.ServerLocator; |
| import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; |
| import org.apache.activemq.artemis.core.security.Role; |
| import org.apache.activemq.artemis.core.server.ActiveMQServer; |
| import org.apache.activemq.artemis.core.server.Queue; |
| import org.apache.activemq.artemis.core.server.impl.AddressInfo; |
| import org.apache.activemq.artemis.core.settings.HierarchicalRepository; |
| import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; |
| import org.apache.activemq.artemis.core.settings.impl.AddressSettings; |
| import org.apache.activemq.artemis.protocol.amqp.converter.AMQPMessageSupport; |
| import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; |
| import org.apache.activemq.transport.amqp.client.AmqpClient; |
| import org.apache.activemq.transport.amqp.client.AmqpConnection; |
| import org.apache.activemq.transport.amqp.client.AmqpMessage; |
| import org.apache.activemq.transport.amqp.client.AmqpSender; |
| import org.apache.activemq.transport.amqp.client.AmqpSession; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.messaging.DeleteOnClose; |
| import org.apache.qpid.proton.amqp.messaging.Source; |
| import org.apache.qpid.proton.amqp.messaging.Target; |
| import org.apache.qpid.proton.amqp.messaging.TerminusDurability; |
| import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; |
| import org.junit.After; |
| import org.junit.Before; |
| |
| import static org.apache.activemq.transport.amqp.AmqpSupport.LIFETIME_POLICY; |
| import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_QUEUE_CAPABILITY; |
| import static org.apache.activemq.transport.amqp.AmqpSupport.TEMP_TOPIC_CAPABILITY; |
| |
| /** |
| * Test support class for tests that will be using the AMQP Proton wrapper client. This is to |
| * make it easier to migrate tests from ActiveMQ5 |
| */ |
| public class AmqpClientTestSupport extends AmqpTestSupport { |
| |
| protected static final Symbol SHARED = Symbol.getSymbol("shared"); |
| protected static final Symbol GLOBAL = Symbol.getSymbol("global"); |
| |
| protected static final String BROKER_NAME = "localhost"; |
| protected static final String NETTY_ACCEPTOR = "netty-acceptor"; |
| |
| protected String noprivUser = "noprivs"; |
| protected String noprivPass = "noprivs"; |
| |
| protected String browseUser = "browser"; |
| protected String browsePass = "browser"; |
| |
| protected String guestUser = "guest"; |
| protected String guestPass = "guest"; |
| |
| protected String fullUser = "user"; |
| protected String fullPass = "pass"; |
| |
| protected ActiveMQServer server; |
| |
| protected MBeanServer mBeanServer = MBeanServerFactory.createMBeanServer(); |
| |
| @Before |
| @Override |
| public void setUp() throws Exception { |
| super.setUp(); |
| |
| server = createServer(); |
| } |
| |
| @After |
| @Override |
| public void tearDown() throws Exception { |
| for (AmqpConnection conn : connections) { |
| try { |
| conn.close(); |
| } catch (Throwable ignored) { |
| ignored.printStackTrace(); |
| } |
| } |
| connections.clear(); |
| |
| try { |
| if (server != null) { |
| server.stop(); |
| } |
| } finally { |
| super.tearDown(); |
| } |
| } |
| |
| protected boolean isAutoCreateQueues() { |
| return true; |
| } |
| |
| protected boolean isAutoCreateAddresses() { |
| return true; |
| } |
| |
| protected boolean isSecurityEnabled() { |
| return false; |
| } |
| |
| protected String getDeadLetterAddress() { |
| return "ActiveMQ.DLQ"; |
| } |
| |
| protected int getPrecreatedQueueSize() { |
| return 10; |
| } |
| |
| public URI getBrokerOpenWireConnectionURI() { |
| try { |
| String uri = null; |
| |
| if (isUseSSL()) { |
| uri = "ssl://127.0.0.1:" + AMQP_PORT; |
| } else { |
| uri = "tcp://127.0.0.1:" + AMQP_PORT; |
| } |
| |
| return new URI(uri); |
| } catch (Exception e) { |
| throw new RuntimeException(); |
| } |
| } |
| |
| protected ActiveMQServer createServer() throws Exception { |
| return createServer(AMQP_PORT); |
| } |
| |
| protected ActiveMQServer createServer(int port) throws Exception { |
| return createServer(port, true); |
| } |
| |
| protected ActiveMQServer createServer(int port, boolean start) throws Exception { |
| |
| final ActiveMQServer server = this.createServer(true, true); |
| |
| server.getConfiguration().getAcceptorConfigurations().clear(); |
| server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(server, port)); |
| server.getConfiguration().setName(BROKER_NAME); |
| server.getConfiguration().setJournalDirectory(server.getConfiguration().getJournalDirectory() + port); |
| server.getConfiguration().setBindingsDirectory(server.getConfiguration().getBindingsDirectory() + port); |
| server.getConfiguration().setPagingDirectory(server.getConfiguration().getPagingDirectory() + port); |
| if (port == AMQP_PORT) { |
| // we use the default large directory if the default port |
| // as some tests will assert number of files |
| server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory()); |
| } else { |
| server.getConfiguration().setLargeMessagesDirectory(server.getConfiguration().getLargeMessagesDirectory() + port); |
| } |
| server.getConfiguration().setJMXManagementEnabled(true); |
| server.getConfiguration().setMessageExpiryScanPeriod(100); |
| server.setMBeanServer(mBeanServer); |
| |
| // Add any additional Acceptors needed for tests |
| addAdditionalAcceptors(server); |
| |
| // Address configuration |
| configureAddressPolicy(server); |
| |
| // Add optional security for tests that need it |
| configureBrokerSecurity(server); |
| |
| // Add extra configuration |
| addConfiguration(server); |
| |
| if (start) { |
| server.start(); |
| |
| // Prepare all addresses and queues for client tests. |
| createAddressAndQueues(server); |
| } |
| |
| return server; |
| } |
| |
| protected void addConfiguration(ActiveMQServer server) { |
| |
| } |
| |
| protected TransportConfiguration addAcceptorConfiguration(ActiveMQServer server, int port) { |
| HashMap<String, Object> params = new HashMap<>(); |
| params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port)); |
| params.put(TransportConstants.PROTOCOLS_PROP_NAME, getConfiguredProtocols()); |
| HashMap<String, Object> amqpParams = new HashMap<>(); |
| configureAMQPAcceptorParameters(amqpParams); |
| TransportConfiguration tc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params, NETTY_ACCEPTOR, amqpParams); |
| configureAMQPAcceptorParameters(tc); |
| return tc; |
| } |
| |
| protected String getConfiguredProtocols() { |
| return "AMQP,OPENWIRE"; |
| } |
| |
| protected void configureAddressPolicy(ActiveMQServer server) { |
| // Address configuration |
| AddressSettings addressSettings = new AddressSettings(); |
| |
| addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE); |
| addressSettings.setAutoCreateQueues(isAutoCreateQueues()); |
| addressSettings.setAutoCreateAddresses(isAutoCreateAddresses()); |
| addressSettings.setDeadLetterAddress(SimpleString.toSimpleString(getDeadLetterAddress())); |
| addressSettings.setExpiryAddress(SimpleString.toSimpleString(getDeadLetterAddress())); |
| |
| server.getConfiguration().getAddressesSettings().put("#", addressSettings); |
| Set<TransportConfiguration> acceptors = server.getConfiguration().getAcceptorConfigurations(); |
| for (TransportConfiguration tc : acceptors) { |
| if (tc.getName().equals(NETTY_ACCEPTOR)) { |
| tc.getExtraParams().put("anycastPrefix", "anycast://"); |
| tc.getExtraParams().put("multicastPrefix", "multicast://"); |
| } |
| } |
| } |
| |
| protected void createAddressAndQueues(ActiveMQServer server) throws Exception { |
| // Default Queue |
| server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName()), RoutingType.ANYCAST)); |
| server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST)); |
| |
| // Default DLQ |
| server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getDeadLetterAddress()), RoutingType.ANYCAST)); |
| server.createQueue(new QueueConfiguration(getDeadLetterAddress()).setRoutingType(RoutingType.ANYCAST)); |
| |
| // Default Topic |
| server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getTopicName()), RoutingType.MULTICAST)); |
| server.createQueue(new QueueConfiguration(getTopicName())); |
| |
| // Additional Test Queues |
| for (int i = 0; i < getPrecreatedQueueSize(); ++i) { |
| server.addAddressInfo(new AddressInfo(SimpleString.toSimpleString(getQueueName(i)), RoutingType.ANYCAST)); |
| server.createQueue(new QueueConfiguration(getQueueName(i)).setRoutingType(RoutingType.ANYCAST)); |
| } |
| } |
| |
| protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception { |
| // None by default |
| } |
| |
| protected void configureBrokerSecurity(ActiveMQServer server) { |
| if (isSecurityEnabled()) { |
| enableSecurity(server); |
| } else { |
| server.getConfiguration().setSecurityEnabled(false); |
| } |
| } |
| |
| protected void enableSecurity(ActiveMQServer server, String... securityMatches) { |
| ActiveMQJAASSecurityManager securityManager = (ActiveMQJAASSecurityManager) server.getSecurityManager(); |
| |
| // User additions |
| securityManager.getConfiguration().addUser(noprivUser, noprivPass); |
| securityManager.getConfiguration().addRole(noprivUser, "nothing"); |
| securityManager.getConfiguration().addUser(browseUser, browsePass); |
| securityManager.getConfiguration().addRole(browseUser, "browser"); |
| securityManager.getConfiguration().addUser(guestUser, guestPass); |
| securityManager.getConfiguration().addRole(guestUser, "guest"); |
| securityManager.getConfiguration().addUser(fullUser, fullPass); |
| securityManager.getConfiguration().addRole(fullUser, "full"); |
| |
| // Configure roles |
| HierarchicalRepository<Set<Role>> securityRepository = server.getSecurityRepository(); |
| HashSet<Role> value = new HashSet<>(); |
| value.add(new Role("nothing", false, false, false, false, false, false, false, false, false, false)); |
| value.add(new Role("browser", false, false, false, false, false, false, false, true, false, false)); |
| value.add(new Role("guest", false, true, false, false, false, false, false, true, false, false)); |
| value.add(new Role("full", true, true, true, true, true, true, true, true, true, true)); |
| securityRepository.addMatch(getQueueName(), value); |
| |
| for (String match : securityMatches) { |
| securityRepository.addMatch(match, value); |
| } |
| |
| server.getConfiguration().setSecurityEnabled(true); |
| } |
| |
| protected void configureAMQPAcceptorParameters(Map<String, Object> params) { |
| // None by default |
| } |
| |
| protected void configureAMQPAcceptorParameters(TransportConfiguration tc) { |
| // None by default |
| } |
| |
| public Queue getProxyToQueue(String queueName) { |
| return server.locateQueue(SimpleString.toSimpleString(queueName)); |
| } |
| |
| public String getTestName() { |
| return getName(); |
| } |
| |
| public String getTopicName() { |
| return getName() + "-Topic"; |
| } |
| |
| public String getQueueName() { |
| return getName(); |
| } |
| |
| public String getQueueName(int index) { |
| return getName() + "-" + index; |
| } |
| |
| protected void sendMessages(String destinationName, int count) throws Exception { |
| sendMessages(destinationName, count, null); |
| } |
| |
| protected void sendMessages(String destinationName, int count, RoutingType routingType) throws Exception { |
| sendMessages(destinationName, count, routingType, false); |
| } |
| |
| protected void sendMessages(String destinationName, |
| int count, |
| RoutingType routingType, |
| boolean durable) throws Exception { |
| sendMessages(destinationName, count, routingType, durable, Collections.emptyMap()); |
| } |
| |
| protected void setData(AmqpMessage amqpMessage) throws Exception { |
| } |
| |
| protected void sendMessages(String destinationName, |
| int count, |
| RoutingType routingType, |
| boolean durable, |
| Map<String, Object> applicationProperties) throws Exception { |
| AmqpClient client = createAmqpClient(); |
| AmqpConnection connection = addConnection(client.connect()); |
| try { |
| AmqpSession session = connection.createSession(); |
| AmqpSender sender = session.createSender(destinationName); |
| |
| for (int i = 0; i < count; ++i) { |
| AmqpMessage message = new AmqpMessage(); |
| for (Map.Entry<String, Object> entry : applicationProperties.entrySet()) { |
| message.setApplicationProperty(entry.getKey(), entry.getValue()); |
| } |
| message.setMessageId("MessageID:" + i); |
| message.setDurable(durable); |
| if (routingType != null) { |
| message.setMessageAnnotation(AMQPMessageSupport.ROUTING_TYPE.toString(), routingType.getType()); |
| } |
| setData(message); |
| sender.send(message); |
| } |
| } finally { |
| connection.close(); |
| } |
| } |
| |
| protected void sendMessages(String destinationName, int count, boolean durable) throws Exception { |
| sendMessages(destinationName, count, durable, null); |
| } |
| |
| protected void sendMessages(String destinationName, int count, boolean durable, byte[] payload) throws Exception { |
| AmqpClient client = createAmqpClient(); |
| AmqpConnection connection = addConnection(client.connect()); |
| try { |
| AmqpSession session = connection.createSession(); |
| AmqpSender sender = session.createSender(destinationName); |
| |
| for (int i = 0; i < count; ++i) { |
| AmqpMessage message = new AmqpMessage(); |
| message.setMessageId("MessageID:" + i); |
| message.setDurable(durable); |
| if (payload != null) { |
| message.setBytes(payload); |
| } |
| sender.send(message); |
| } |
| } finally { |
| connection.close(); |
| } |
| } |
| |
| protected void sendMessagesCore(String destinationName, int count, boolean durable) throws Exception { |
| sendMessagesCore(destinationName, count, durable, null); |
| } |
| |
| protected void sendMessagesCore(String destinationName, int count, boolean durable, byte[] body) throws Exception { |
| ServerLocator serverLocator = ActiveMQClient.createServerLocator("tcp://127.0.0.1:5672"); |
| ClientSessionFactory clientSessionFactory = serverLocator.createSessionFactory(); |
| ClientSession session = clientSessionFactory.createSession(); |
| try { |
| ClientProducer sender = session.createProducer(destinationName); |
| |
| for (int i = 0; i < count; ++i) { |
| ClientMessage message = session.createMessage(durable); |
| if (body != null) { |
| message.getBodyBuffer().writeBytes(body); |
| } |
| sender.send(message); |
| } |
| } finally { |
| session.close(); |
| } |
| } |
| |
| protected void sendMessagesOpenWire(String destinationName, int count, boolean durable) throws Exception { |
| sendMessagesOpenWire(destinationName, count, durable, null); |
| } |
| |
| protected void sendMessagesOpenWire(String destinationName, |
| int count, |
| boolean durable, |
| byte[] payload) throws Exception { |
| ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://127.0.0.1:5672"); |
| Connection connection = cf.createConnection(); |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| try { |
| MessageProducer producer = session.createProducer(session.createQueue(destinationName)); |
| if (durable) { |
| producer.setDeliveryMode(DeliveryMode.PERSISTENT); |
| } else { |
| producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); |
| } |
| |
| for (int i = 0; i < count; ++i) { |
| BytesMessage message = session.createBytesMessage(); |
| if (payload != null) { |
| message.writeBytes(payload); |
| } |
| producer.send(message); |
| } |
| } finally { |
| connection.close(); |
| } |
| } |
| |
| protected Source createDynamicSource(boolean topic) { |
| |
| Source source = new Source(); |
| source.setDynamic(true); |
| source.setDurable(TerminusDurability.NONE); |
| source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); |
| |
| // Set the dynamic node lifetime-policy |
| Map<Symbol, Object> dynamicNodeProperties = new HashMap<>(); |
| dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance()); |
| source.setDynamicNodeProperties(dynamicNodeProperties); |
| |
| // Set the capability to indicate the node type being created |
| if (!topic) { |
| source.setCapabilities(TEMP_QUEUE_CAPABILITY); |
| } else { |
| source.setCapabilities(TEMP_TOPIC_CAPABILITY); |
| } |
| |
| return source; |
| } |
| |
| protected Target createDynamicTarget(boolean topic) { |
| |
| Target target = new Target(); |
| target.setDynamic(true); |
| target.setDurable(TerminusDurability.NONE); |
| target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH); |
| |
| // Set the dynamic node lifetime-policy |
| Map<Symbol, Object> dynamicNodeProperties = new HashMap<>(); |
| dynamicNodeProperties.put(LIFETIME_POLICY, DeleteOnClose.getInstance()); |
| target.setDynamicNodeProperties(dynamicNodeProperties); |
| |
| // Set the capability to indicate the node type being created |
| if (!topic) { |
| target.setCapabilities(TEMP_QUEUE_CAPABILITY); |
| } else { |
| target.setCapabilities(TEMP_TOPIC_CAPABILITY); |
| } |
| |
| return target; |
| } |
| } |