| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.activemq.artemis.tests.integration.amqp.connect; |
| |
| import javax.jms.Connection; |
| import javax.jms.ConnectionFactory; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Session; |
| import javax.jms.TextMessage; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import io.vertx.core.Vertx; |
| import io.vertx.proton.ProtonConnection; |
| import io.vertx.proton.ProtonServerOptions; |
| import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; |
| import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; |
| import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement; |
| import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; |
| import org.apache.activemq.artemis.core.server.ActiveMQServer; |
| import org.apache.activemq.artemis.logs.AssertionLoggerHandler; |
| import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; |
| import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; |
| import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnection; |
| import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource; |
| import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; |
| import org.apache.activemq.artemis.tests.util.CFUtil; |
| import org.apache.activemq.artemis.tests.util.Wait; |
| import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; |
| import org.apache.activemq.transport.amqp.client.AmqpClient; |
| import org.apache.activemq.transport.amqp.client.AmqpConnection; |
| import org.apache.activemq.transport.amqp.client.AmqpSession; |
| import org.apache.activemq.transport.amqp.client.AmqpValidator; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.messaging.Accepted; |
| import org.apache.qpid.proton.amqp.transport.AmqpError; |
| import org.apache.qpid.proton.amqp.transport.ErrorCondition; |
| import org.apache.qpid.proton.amqp.transport.Target; |
| import org.apache.qpid.proton.engine.Link; |
| import org.apache.qpid.proton.engine.Receiver; |
| import org.apache.qpid.proton.engine.Sender; |
| import org.apache.qpid.proton.engine.impl.ConnectionImpl; |
| import org.jboss.logging.Logger; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import static java.util.EnumSet.of; |
| import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; |
| |
| /** |
| * This test will make sure the Broker connection will react accordingly to a few misconfigs and possible errors on the network of brokers and eventually qipd-dispatch. |
| */ |
| public class ValidateAMQPErrorsTest extends AmqpClientTestSupport { |
| |
| protected static final int AMQP_PORT_2 = 5673; |
| private static final Logger logger = Logger.getLogger(ValidateAMQPErrorsTest.class); |
| protected Vertx vertx; |
| |
| protected MockServer mockServer; |
| |
| public void startVerx() { |
| vertx = Vertx.vertx(); |
| } |
| |
| @After |
| public void stop() throws Exception { |
| if (mockServer != null) { |
| mockServer.close(); |
| mockServer = null; |
| } |
| if (vertx != null) { |
| try { |
| CountDownLatch latch = new CountDownLatch(1); |
| vertx.close((x) -> latch.countDown()); |
| Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); |
| } finally { |
| vertx = null; |
| } |
| } |
| AssertionLoggerHandler.stopCapture(); // Just in case startCapture was called in any of the tests here |
| } |
| |
| @Override |
| protected ActiveMQServer createServer() throws Exception { |
| return createServer(AMQP_PORT, false); |
| } |
| |
| /** |
| * Connecting to itself should issue an error. |
| * and the max retry should still be counted, not just keep connecting forever. |
| */ |
| @Test |
| public void testConnectItself() throws Exception { |
| try { |
| AssertionLoggerHandler.startCapture(); |
| |
| AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(10).setRetryInterval(1); |
| amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); |
| server.getConfiguration().addAMQPConnection(amqpConnection); |
| |
| server.start(); |
| |
| Assert.assertEquals(1, server.getBrokerConnections().size()); |
| server.getBrokerConnections().forEach((t) -> Wait.assertFalse(t::isStarted)); |
| Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111001")); // max retry |
| AssertionLoggerHandler.clear(); |
| Thread.sleep(100); |
| Assert.assertFalse(AssertionLoggerHandler.findText("AMQ111002")); // there shouldn't be a retry after the last failure |
| Assert.assertFalse(AssertionLoggerHandler.findText("AMQ111003")); // there shouldn't be a retry after the last failure |
| } finally { |
| AssertionLoggerHandler.stopCapture(); |
| } |
| } |
| |
| @Test |
| public void testCloseLinkOnMirror() throws Exception { |
| try { |
| AssertionLoggerHandler.startCapture(); |
| |
| ActiveMQServer server2 = createServer(AMQP_PORT_2, false); |
| |
| AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(-1).setRetryInterval(10); |
| amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); |
| server.getConfiguration().addAMQPConnection(amqpConnection); |
| |
| server.start(); |
| Assert.assertEquals(1, server.getBrokerConnections().size()); |
| Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111002")); |
| server.getBrokerConnections().forEach((t) -> Wait.assertTrue(() -> ((AMQPBrokerConnection) t).isConnecting())); |
| |
| server2.start(); |
| |
| server.getBrokerConnections().forEach((t) -> Wait.assertFalse(() -> ((AMQPBrokerConnection) t).isConnecting())); |
| |
| createAddressAndQueues(server); |
| |
| Wait.assertTrue(() -> server2.locateQueue(getQueueName()) != null); |
| |
| Wait.assertEquals(1, server2.getRemotingService()::getConnectionCount); |
| server2.getRemotingService().getConnections().forEach((t) -> { |
| try { |
| ActiveMQProtonRemotingConnection connection = (ActiveMQProtonRemotingConnection) t; |
| ConnectionImpl protonConnection = (ConnectionImpl) connection.getAmqpConnection().getHandler().getConnection(); |
| Wait.waitFor(() -> protonConnection.linkHead(of(ACTIVE), of(ACTIVE)) != null); |
| connection.getAmqpConnection().runNow(() -> { |
| Receiver receiver = (Receiver) protonConnection.linkHead(of(ACTIVE), of(ACTIVE)); |
| receiver.close(); |
| connection.flush(); |
| }); |
| } catch (Exception e) { |
| e.printStackTrace(); |
| } |
| }); |
| |
| ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); |
| ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2); |
| |
| try (Connection connection = cf1.createConnection()) { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); |
| for (int i = 0; i < 10; i++) { |
| producer.send(session.createTextMessage("message " + i)); |
| } |
| } |
| |
| // messages should still flow after a disconnect on the link |
| // the server should reconnect as if it was a failure |
| try (Connection connection = cf2.createConnection()) { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); |
| connection.start(); |
| for (int i = 0; i < 10; i++) { |
| Assert.assertEquals("message " + i, ((TextMessage) consumer.receive(5000)).getText()); |
| } |
| } |
| |
| } finally { |
| AssertionLoggerHandler.stopCapture(); |
| } |
| } |
| |
| @Test |
| public void testCloseLinkOnSender() throws Exception { |
| testCloseLink(true); |
| } |
| |
| @Test |
| public void testCloseLinkOnReceiver() throws Exception { |
| testCloseLink(false); |
| } |
| |
| public void testCloseLink(boolean isSender) throws Exception { |
| |
| AtomicInteger errors = new AtomicInteger(0); |
| AssertionLoggerHandler.startCapture(true); |
| |
| ActiveMQServer server2 = createServer(AMQP_PORT_2, false); |
| |
| if (isSender) { |
| AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(-1).setRetryInterval(10); |
| amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER)); |
| server.getConfiguration().addAMQPConnection(amqpConnection); |
| } else { |
| AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(10); |
| amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.RECEIVER)); |
| server2.getConfiguration().addAMQPConnection(amqpConnection); |
| } |
| |
| if (isSender) { |
| server.start(); |
| Assert.assertEquals(1, server.getBrokerConnections().size()); |
| } else { |
| server2.start(); |
| Assert.assertEquals(1, server2.getBrokerConnections().size()); |
| } |
| Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111002")); |
| server.getBrokerConnections().forEach((t) -> Wait.assertTrue(() -> ((AMQPBrokerConnection) t).isConnecting())); |
| |
| if (isSender) { |
| server2.start(); |
| } else { |
| server.start(); |
| } |
| |
| server.getBrokerConnections().forEach((t) -> Wait.assertFalse(() -> ((AMQPBrokerConnection) t).isConnecting())); |
| |
| createAddressAndQueues(server); |
| createAddressAndQueues(server2); |
| |
| Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null); |
| Wait.assertTrue(() -> server2.locateQueue(getQueueName()) != null); |
| |
| ActiveMQServer serverReceivingConnections = isSender ? server2 : server; |
| Wait.assertEquals(1, serverReceivingConnections.getRemotingService()::getConnectionCount); |
| serverReceivingConnections.getRemotingService().getConnections().forEach((t) -> { |
| try { |
| ActiveMQProtonRemotingConnection connection = (ActiveMQProtonRemotingConnection) t; |
| ConnectionImpl protonConnection = (ConnectionImpl) connection.getAmqpConnection().getHandler().getConnection(); |
| Wait.waitFor(() -> protonConnection.linkHead(of(ACTIVE), of(ACTIVE)) != null); |
| connection.getAmqpConnection().runNow(() -> { |
| Link theLink = protonConnection.linkHead(of(ACTIVE), of(ACTIVE)); |
| theLink.close(); |
| connection.flush(); |
| }); |
| } catch (Exception e) { |
| errors.incrementAndGet(); |
| e.printStackTrace(); |
| } |
| }); |
| |
| Wait.assertEquals(1, () -> AssertionLoggerHandler.countText("AMQ119021")); |
| |
| ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); |
| ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2); |
| |
| try (Connection connection = cf1.createConnection()) { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); |
| for (int i = 0; i < 10; i++) { |
| producer.send(session.createTextMessage("message " + i)); |
| } |
| } |
| |
| // messages should still flow after a disconnect on the link |
| // the server should reconnect as if it was a failure |
| try (Connection connection = cf2.createConnection()) { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName())); |
| connection.start(); |
| for (int i = 0; i < 10; i++) { |
| Assert.assertEquals("message " + i, ((TextMessage) consumer.receive(5000)).getText()); |
| } |
| } |
| |
| Assert.assertEquals(0, errors.get()); |
| |
| } |
| |
| @Test |
| public void testTimeoutOnSenderOpen() throws Exception { |
| |
| startVerx(); |
| |
| ProtonServerOptions serverOptions = new ProtonServerOptions(); |
| |
| mockServer = new MockServer(vertx, serverOptions, null, serverConnection -> { |
| serverConnection.openHandler(serverSender -> { |
| serverConnection.closeHandler(x -> serverConnection.close()); |
| serverConnection.open(); |
| }); |
| serverConnection.sessionOpenHandler((s) -> { |
| s.open(); |
| }); |
| serverConnection.senderOpenHandler((x) -> { |
| x.open(); |
| }); |
| serverConnection.receiverOpenHandler((x) -> { |
| //x.open(); // I'm missing the open, so it won't ever connect |
| }); |
| }); |
| |
| try { |
| AssertionLoggerHandler.startCapture(true); |
| |
| AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=20").setReconnectAttempts(5).setRetryInterval(10); |
| amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER)); |
| amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); |
| server.getConfiguration().addAMQPConnection(amqpConnection); |
| server.start(); |
| |
| Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111001")); |
| Wait.assertEquals(6, () -> AssertionLoggerHandler.countText("AMQ119020")); // 0..5 == 6 |
| |
| } finally { |
| mockServer.close(); |
| } |
| } |
| |
| @Test |
| public void testReconnectAfterSenderOpenTimeout() throws Exception { |
| |
| startVerx(); |
| |
| AssertionLoggerHandler.startCapture(true); |
| |
| ProtonServerOptions serverOptions = new ProtonServerOptions(); |
| |
| AtomicInteger countOpen = new AtomicInteger(0); |
| CyclicBarrier startFlag = new CyclicBarrier(2); |
| CountDownLatch blockBeforeOpen = new CountDownLatch(1); |
| AtomicInteger disconnects = new AtomicInteger(0); |
| AtomicInteger messagesReceived = new AtomicInteger(0); |
| AtomicInteger errors = new AtomicInteger(0); |
| |
| ConcurrentHashSet<ProtonConnection> connections = new ConcurrentHashSet<>(); |
| |
| mockServer = new MockServer(vertx, serverOptions, null, serverConnection -> { |
| serverConnection.disconnectHandler(c -> { |
| disconnects.incrementAndGet(); // number of retries |
| connections.remove(c); |
| }); |
| serverConnection.openHandler(serverSender -> { |
| serverConnection.closeHandler(x -> { |
| serverConnection.close(); |
| connections.remove(serverConnection); |
| }); |
| serverConnection.open(); |
| connections.add(serverConnection); |
| }); |
| serverConnection.sessionOpenHandler((s) -> { |
| s.open(); |
| }); |
| serverConnection.senderOpenHandler((x) -> { |
| x.open(); |
| }); |
| serverConnection.receiverOpenHandler((x) -> { |
| if (countOpen.incrementAndGet() > 2) { |
| if (countOpen.get() == 3) { |
| try { |
| startFlag.await(10, TimeUnit.SECONDS); |
| blockBeforeOpen.await(10, TimeUnit.SECONDS); |
| return; |
| } catch (Throwable ignored) { |
| } |
| } |
| HashMap<Symbol, Object> brokerIDProperties = new HashMap<>(); |
| brokerIDProperties.put(AMQPMirrorControllerSource.BROKER_ID, "fake-id"); |
| x.setProperties(brokerIDProperties); |
| x.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY}); |
| x.setTarget(x.getRemoteTarget()); |
| x.open(); |
| x.handler((del, msg) -> { |
| if (msg.getApplicationProperties() != null) { |
| Map map = msg.getApplicationProperties().getValue(); |
| Object value = map.get("sender"); |
| if (value != null) { |
| if (messagesReceived.get() != ((Integer) value).intValue()) { |
| logger.warn("Message out of order. Expected " + messagesReceived.get() + " but received " + value); |
| errors.incrementAndGet(); |
| } |
| messagesReceived.incrementAndGet(); |
| } |
| } |
| }); |
| } |
| }); |
| }); |
| |
| AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=1000").setReconnectAttempts(10).setRetryInterval(10); |
| amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); |
| server.getConfiguration().addAMQPConnection(amqpConnection); |
| server.start(); |
| |
| startFlag.await(10, TimeUnit.SECONDS); |
| blockBeforeOpen.countDown(); |
| |
| Wait.assertEquals(2, disconnects::intValue); |
| Wait.assertEquals(1, connections::size); |
| |
| Wait.assertEquals(3, () -> AssertionLoggerHandler.countText("AMQ119020")); |
| |
| ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); |
| try (Connection connection = factory.createConnection()) { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); |
| for (int i = 0; i < 100; i++) { |
| TextMessage message = session.createTextMessage("hello"); |
| message.setIntProperty("sender", i); |
| producer.send(message); |
| } |
| } |
| |
| Wait.assertEquals(100, messagesReceived::intValue, 5000); |
| Assert.assertEquals(0, errors.get(), 5000); |
| } |
| |
| @Test |
| public void testNoServerOfferedMirrorCapability() throws Exception { |
| startVerx(); |
| |
| mockServer = new MockServer(vertx, serverConnection -> { |
| serverConnection.openHandler(serverSender -> { |
| serverConnection.open(); |
| }); |
| serverConnection.sessionOpenHandler((s) -> { |
| s.open(); |
| }); |
| serverConnection.senderOpenHandler((x) -> { |
| x.open(); |
| }); |
| serverConnection.receiverOpenHandler((x) -> { |
| x.setTarget(x.getRemoteTarget()); |
| x.open(); |
| }); |
| }); |
| |
| AssertionLoggerHandler.startCapture(true); |
| |
| AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=100").setReconnectAttempts(5).setRetryInterval(10); |
| amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); |
| server.getConfiguration().addAMQPConnection(amqpConnection); |
| server.start(); |
| |
| Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111001")); |
| Assert.assertEquals(6, AssertionLoggerHandler.countText("AMQ119018")); // 0..5 = 6 |
| } |
| |
| /** |
| * Refuse the first mirror link, verify broker handles it and reconnects |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testReconnectAfterMirrorLinkRefusal() throws Exception { |
| startVerx(); |
| |
| AtomicInteger errors = new AtomicInteger(0); |
| |
| AtomicInteger messagesReceived = new AtomicInteger(0); |
| |
| List<ProtonConnection> connections = Collections.synchronizedList(new ArrayList<ProtonConnection>()); |
| List<ProtonConnection> disconnected = Collections.synchronizedList(new ArrayList<ProtonConnection>()); |
| AtomicInteger refusedLinkMessageCount = new AtomicInteger(); |
| AtomicInteger linkOpens = new AtomicInteger(0); |
| |
| mockServer = new MockServer(vertx, serverConnection -> { |
| serverConnection.disconnectHandler(c -> { |
| disconnected.add(serverConnection); |
| }); |
| |
| serverConnection.openHandler(c -> { |
| connections.add(serverConnection); |
| serverConnection.open(); |
| }); |
| |
| serverConnection.closeHandler(c -> { |
| serverConnection.close(); |
| connections.remove(serverConnection); |
| }); |
| |
| serverConnection.sessionOpenHandler(session -> { |
| session.open(); |
| }); |
| |
| serverConnection.receiverOpenHandler(serverReceiver -> { |
| Target remoteTarget = serverReceiver.getRemoteTarget(); |
| String remoteAddress = remoteTarget == null ? null : remoteTarget.getAddress(); |
| if (remoteAddress == null || !remoteAddress.startsWith(ProtonProtocolManager.MIRROR_ADDRESS)) { |
| errors.incrementAndGet(); |
| logger.warn("Receiving address as " + remoteAddress); |
| return; |
| } |
| if (linkOpens.incrementAndGet() != 2) { |
| logger.debug("Link Opens::" + linkOpens); |
| logger.debug("ServerReceiver = " + serverReceiver.getTarget()); |
| serverReceiver.setTarget(null); |
| |
| serverReceiver.handler((del, msg) -> { |
| refusedLinkMessageCount.incrementAndGet(); |
| logger.debug("Should not have got message on refused link: " + msg); |
| }); |
| |
| serverReceiver.open(); |
| |
| vertx.setTimer(20, x -> { |
| serverReceiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, "Testing refusal of mirror link for $reasons")); |
| serverReceiver.close(); |
| }); |
| } else { |
| serverReceiver.setTarget(serverReceiver.getRemoteTarget()); |
| HashMap<Symbol, Object> linkProperties = new HashMap<>(); |
| linkProperties.put(AMQPMirrorControllerSource.BROKER_ID, "fake-id"); |
| |
| serverReceiver.setProperties(linkProperties); |
| serverReceiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY}); |
| |
| serverReceiver.handler((del, msg) -> { |
| logger.debug("prefetch = " + serverReceiver.getPrefetch() + ", Got message: " + msg); |
| if (msg.getApplicationProperties() != null) { |
| Map map = msg.getApplicationProperties().getValue(); |
| Object value = map.get("sender"); |
| if (value != null) { |
| if (messagesReceived.get() != ((Integer) value).intValue()) { |
| logger.warn("Message out of order. Expected " + messagesReceived.get() + " but received " + value); |
| errors.incrementAndGet(); |
| } |
| messagesReceived.incrementAndGet(); |
| } |
| } |
| del.disposition(Accepted.getInstance(), true); |
| if (serverReceiver.getPrefetch() == 0) { |
| serverReceiver.flow(1); |
| } |
| }); |
| |
| serverReceiver.open(); |
| } |
| }); |
| }); |
| |
| AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort()).setReconnectAttempts(3).setRetryInterval(10); |
| amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); |
| server.getConfiguration().addAMQPConnection(amqpConnection); |
| server.start(); |
| |
| Wait.assertEquals(1, disconnected::size, 6000); |
| Wait.assertEquals(2, connections::size, 6000); |
| |
| assertSame(connections.get(0), disconnected.get(0)); |
| assertFalse(connections.get(1).isDisconnected()); |
| |
| assertEquals("Should not have got any message on refused link", 0, refusedLinkMessageCount.get()); |
| assertEquals(0, errors.get()); |
| |
| ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); |
| try (Connection connection = factory.createConnection()) { |
| Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); |
| for (int i = 0; i < 100; i++) { |
| TextMessage message = session.createTextMessage("hello"); |
| message.setIntProperty("sender", i); |
| producer.send(message); |
| } |
| } |
| |
| Wait.assertEquals(100, messagesReceived::intValue); |
| assertEquals(0, errors.get()); // Meant to check again. the errors check before was because of connection issues. This one is about duplicates on receiving |
| } |
| |
| @Test |
| public void testNoClientDesiredMirrorCapability() throws Exception { |
| AssertionLoggerHandler.startCapture(); |
| server.start(); |
| |
| AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null); |
| client.setValidator(new AmqpValidator() { |
| |
| @Override |
| public void inspectOpenedResource(Sender sender) { |
| ErrorCondition condition = sender.getRemoteCondition(); |
| |
| if (condition != null && condition.getCondition() != null) { |
| if (!condition.getCondition().equals(AmqpError.ILLEGAL_STATE)) { |
| markAsInvalid("Should have been closed with an illegal state error, but error was: " + condition); |
| } |
| |
| if (!condition.getDescription().contains("AMQ119024")) { |
| markAsInvalid("should have indicated the error code about missing a desired capability"); |
| } |
| |
| if (!condition.getDescription().contains(AMQPMirrorControllerSource.MIRROR_CAPABILITY)) { |
| markAsInvalid("should have indicated the error code about missing a desired capability"); |
| } |
| } else { |
| markAsInvalid("Sender should have been detached with an error"); |
| } |
| } |
| }); |
| |
| String address = ProtonProtocolManager.getMirrorAddress(getTestName()); |
| |
| AmqpConnection connection = client.connect(); |
| try { |
| AmqpSession session = connection.createSession(); |
| |
| try { |
| session.createSender(address); |
| fail("Link should have been refused."); |
| } catch (Exception ex) { |
| Assert.assertTrue(ex.getMessage().contains("AMQ119024")); |
| instanceLog.debug("Caught expected exception"); |
| } |
| |
| connection.getStateInspector().assertValid(); |
| } finally { |
| connection.close(); |
| } |
| |
| Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ119024")); |
| } |
| } |