/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.activemq.artemis.tests.integration.amqp.connect;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.vertx.core.Vertx;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonServerOptions;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnection;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import static java.util.EnumSet.of;
import static org.apache.qpid.proton.engine.EndpointState.ACTIVE;

/**
 * This test will make sure the Broker connection will react accordingly to a few misconfigs and possible errors on the network of brokers and eventually qipd-dispatch.
 */
public class ValidateAMQPErrorsTest extends AmqpClientTestSupport {

   protected static final int AMQP_PORT_2 = 5673;
   private static final Logger logger = Logger.getLogger(ValidateAMQPErrorsTest.class);
   protected Vertx vertx;

   protected MockServer mockServer;

   public void startVerx() {
      vertx = Vertx.vertx();
   }

   @After
   public void stop() throws Exception {
      if (mockServer != null) {
         mockServer.close();
         mockServer = null;
      }
      if (vertx != null) {
         try {
            CountDownLatch latch = new CountDownLatch(1);
            vertx.close((x) -> latch.countDown());
            Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
         } finally {
            vertx = null;
         }
      }
      AssertionLoggerHandler.stopCapture(); // Just in case startCapture was called in any of the tests here
   }

   @Override
   protected ActiveMQServer createServer() throws Exception {
      return createServer(AMQP_PORT, false);
   }

   /**
    * Connecting to itself should issue an error.
    * and the max retry should still be counted, not just keep connecting forever.
    */
   @Test
   public void testConnectItself() throws Exception {
      try {
         AssertionLoggerHandler.startCapture();

         AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(10).setRetryInterval(1);
         amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
         server.getConfiguration().addAMQPConnection(amqpConnection);

         server.start();

         Assert.assertEquals(1, server.getBrokerConnections().size());
         server.getBrokerConnections().forEach((t) -> Wait.assertFalse(t::isStarted));
         Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111001")); // max retry
         AssertionLoggerHandler.clear();
         Thread.sleep(100);
         Assert.assertFalse(AssertionLoggerHandler.findText("AMQ111002")); // there shouldn't be a retry after the last failure
         Assert.assertFalse(AssertionLoggerHandler.findText("AMQ111003")); // there shouldn't be a retry after the last failure
      } finally {
         AssertionLoggerHandler.stopCapture();
      }
   }

   @Test
   public void testCloseLinkOnMirror() throws Exception {
      try {
         AssertionLoggerHandler.startCapture();

         ActiveMQServer server2 = createServer(AMQP_PORT_2, false);

         AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(-1).setRetryInterval(10);
         amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
         server.getConfiguration().addAMQPConnection(amqpConnection);

         server.start();
         Assert.assertEquals(1, server.getBrokerConnections().size());
         Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111002"));
         server.getBrokerConnections().forEach((t) -> Wait.assertTrue(() -> ((AMQPBrokerConnection) t).isConnecting()));

         server2.start();

         server.getBrokerConnections().forEach((t) -> Wait.assertFalse(() -> ((AMQPBrokerConnection) t).isConnecting()));

         createAddressAndQueues(server);

         Wait.assertTrue(() -> server2.locateQueue(getQueueName()) != null);

         Wait.assertEquals(1, server2.getRemotingService()::getConnectionCount);
         server2.getRemotingService().getConnections().forEach((t) -> {
            try {
               ActiveMQProtonRemotingConnection connection = (ActiveMQProtonRemotingConnection) t;
               ConnectionImpl protonConnection = (ConnectionImpl) connection.getAmqpConnection().getHandler().getConnection();
               Wait.waitFor(() -> protonConnection.linkHead(of(ACTIVE), of(ACTIVE)) != null);
               connection.getAmqpConnection().runNow(() -> {
                  Receiver receiver = (Receiver) protonConnection.linkHead(of(ACTIVE), of(ACTIVE));
                  receiver.close();
                  connection.flush();
               });
            } catch (Exception e) {
               e.printStackTrace();
            }
         });

         ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
         ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);

         try (Connection connection = cf1.createConnection()) {
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
            for (int i = 0; i < 10; i++) {
               producer.send(session.createTextMessage("message " + i));
            }
         }

         // messages should still flow after a disconnect on the link
         // the server should reconnect as if it was a failure
         try (Connection connection = cf2.createConnection()) {
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
            connection.start();
            for (int i = 0; i < 10; i++) {
               Assert.assertEquals("message " + i, ((TextMessage) consumer.receive(5000)).getText());
            }
         }

      } finally {
         AssertionLoggerHandler.stopCapture();
      }
   }

   @Test
   public void testCloseLinkOnSender() throws Exception {
      testCloseLink(true);
   }

   @Test
   public void testCloseLinkOnReceiver() throws Exception {
      testCloseLink(false);
   }

   public void testCloseLink(boolean isSender) throws Exception {

      AtomicInteger errors = new AtomicInteger(0);
      AssertionLoggerHandler.startCapture(true);

      ActiveMQServer server2 = createServer(AMQP_PORT_2, false);

      if (isSender) {
         AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(-1).setRetryInterval(10);
         amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER));
         server.getConfiguration().addAMQPConnection(amqpConnection);
      } else {
         AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(10);
         amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.RECEIVER));
         server2.getConfiguration().addAMQPConnection(amqpConnection);
      }

      if (isSender) {
         server.start();
         Assert.assertEquals(1, server.getBrokerConnections().size());
      } else {
         server2.start();
         Assert.assertEquals(1, server2.getBrokerConnections().size());
      }
      Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111002"));
      server.getBrokerConnections().forEach((t) -> Wait.assertTrue(() -> ((AMQPBrokerConnection) t).isConnecting()));

      if (isSender) {
         server2.start();
      } else {
         server.start();
      }

      server.getBrokerConnections().forEach((t) -> Wait.assertFalse(() -> ((AMQPBrokerConnection) t).isConnecting()));

      createAddressAndQueues(server);
      createAddressAndQueues(server2);

      Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
      Wait.assertTrue(() -> server2.locateQueue(getQueueName()) != null);

      ActiveMQServer serverReceivingConnections = isSender ? server2 : server;
      Wait.assertEquals(1, serverReceivingConnections.getRemotingService()::getConnectionCount);
      serverReceivingConnections.getRemotingService().getConnections().forEach((t) -> {
         try {
            ActiveMQProtonRemotingConnection connection = (ActiveMQProtonRemotingConnection) t;
            ConnectionImpl protonConnection = (ConnectionImpl) connection.getAmqpConnection().getHandler().getConnection();
            Wait.waitFor(() -> protonConnection.linkHead(of(ACTIVE), of(ACTIVE)) != null);
            connection.getAmqpConnection().runNow(() -> {
               Link theLink = protonConnection.linkHead(of(ACTIVE), of(ACTIVE));
               theLink.close();
               connection.flush();
            });
         } catch (Exception e) {
            errors.incrementAndGet();
            e.printStackTrace();
         }
      });

      Wait.assertEquals(1, () -> AssertionLoggerHandler.countText("AMQ119021"));

      ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
      ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);

      try (Connection connection = cf1.createConnection()) {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
         for (int i = 0; i < 10; i++) {
            producer.send(session.createTextMessage("message " + i));
         }
      }

      // messages should still flow after a disconnect on the link
      // the server should reconnect as if it was a failure
      try (Connection connection = cf2.createConnection()) {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
         connection.start();
         for (int i = 0; i < 10; i++) {
            Assert.assertEquals("message " + i, ((TextMessage) consumer.receive(5000)).getText());
         }
      }

      Assert.assertEquals(0, errors.get());

   }

   @Test
   public void testTimeoutOnSenderOpen() throws Exception {

      startVerx();

      ProtonServerOptions serverOptions = new ProtonServerOptions();

      mockServer = new MockServer(vertx, serverOptions, null, serverConnection -> {
         serverConnection.openHandler(serverSender -> {
            serverConnection.closeHandler(x -> serverConnection.close());
            serverConnection.open();
         });
         serverConnection.sessionOpenHandler((s) -> {
            s.open();
         });
         serverConnection.senderOpenHandler((x) -> {
            x.open();
         });
         serverConnection.receiverOpenHandler((x) -> {
            //x.open(); // I'm missing the open, so it won't ever connect
         });
      });

      try {
         AssertionLoggerHandler.startCapture(true);

         AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=20").setReconnectAttempts(5).setRetryInterval(10);
         amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER));
         amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
         server.getConfiguration().addAMQPConnection(amqpConnection);
         server.start();

         Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111001"));
         Wait.assertEquals(6, () -> AssertionLoggerHandler.countText("AMQ119020")); // 0..5 == 6

      } finally {
         mockServer.close();
      }
   }

   @Test
   public void testReconnectAfterSenderOpenTimeout() throws Exception {

      startVerx();

      AssertionLoggerHandler.startCapture(true);

      ProtonServerOptions serverOptions = new ProtonServerOptions();

      AtomicInteger countOpen = new AtomicInteger(0);
      CyclicBarrier startFlag = new CyclicBarrier(2);
      CountDownLatch blockBeforeOpen = new CountDownLatch(1);
      AtomicInteger disconnects = new AtomicInteger(0);
      AtomicInteger messagesReceived = new AtomicInteger(0);
      AtomicInteger errors = new AtomicInteger(0);

      ConcurrentHashSet<ProtonConnection> connections = new ConcurrentHashSet<>();

      mockServer = new MockServer(vertx, serverOptions, null, serverConnection -> {
         serverConnection.disconnectHandler(c -> {
            disconnects.incrementAndGet(); // number of retries
            connections.remove(c);
         });
         serverConnection.openHandler(serverSender -> {
            serverConnection.closeHandler(x -> {
               serverConnection.close();
               connections.remove(serverConnection);
            });
            serverConnection.open();
            connections.add(serverConnection);
         });
         serverConnection.sessionOpenHandler((s) -> {
            s.open();
         });
         serverConnection.senderOpenHandler((x) -> {
            x.open();
         });
         serverConnection.receiverOpenHandler((x) -> {
            if (countOpen.incrementAndGet() > 2) {
               if (countOpen.get() == 3) {
                  try {
                     startFlag.await(10, TimeUnit.SECONDS);
                     blockBeforeOpen.await(10, TimeUnit.SECONDS);
                     return;
                  } catch (Throwable ignored) {
                  }
               }
               HashMap<Symbol, Object> brokerIDProperties = new HashMap<>();
               brokerIDProperties.put(AMQPMirrorControllerSource.BROKER_ID, "fake-id");
               x.setProperties(brokerIDProperties);
               x.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});
               x.setTarget(x.getRemoteTarget());
               x.open();
               x.handler((del, msg) -> {
                  if (msg.getApplicationProperties() != null) {
                     Map map = msg.getApplicationProperties().getValue();
                     Object value = map.get("sender");
                     if (value != null) {
                        if (messagesReceived.get() != ((Integer) value).intValue()) {
                           logger.warn("Message out of order. Expected " + messagesReceived.get() + " but received " + value);
                           errors.incrementAndGet();
                        }
                        messagesReceived.incrementAndGet();
                     }
                  }
               });
            }
         });
      });

      AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=1000").setReconnectAttempts(10).setRetryInterval(10);
      amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
      server.getConfiguration().addAMQPConnection(amqpConnection);
      server.start();

      startFlag.await(10, TimeUnit.SECONDS);
      blockBeforeOpen.countDown();

      Wait.assertEquals(2, disconnects::intValue);
      Wait.assertEquals(1, connections::size);

      Wait.assertEquals(3, () -> AssertionLoggerHandler.countText("AMQ119020"));

      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
      try (Connection connection = factory.createConnection()) {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
         for (int i = 0; i < 100; i++) {
            TextMessage message = session.createTextMessage("hello");
            message.setIntProperty("sender", i);
            producer.send(message);
         }
      }

      Wait.assertEquals(100, messagesReceived::intValue, 5000);
      Assert.assertEquals(0, errors.get(), 5000);
   }

   @Test
   public void testNoServerOfferedMirrorCapability() throws Exception {
      startVerx();

      mockServer = new MockServer(vertx, serverConnection -> {
         serverConnection.openHandler(serverSender -> {
            serverConnection.open();
         });
         serverConnection.sessionOpenHandler((s) -> {
            s.open();
         });
         serverConnection.senderOpenHandler((x) -> {
            x.open();
         });
         serverConnection.receiverOpenHandler((x) -> {
            x.setTarget(x.getRemoteTarget());
            x.open();
         });
      });

      AssertionLoggerHandler.startCapture(true);

      AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=100").setReconnectAttempts(5).setRetryInterval(10);
      amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
      server.getConfiguration().addAMQPConnection(amqpConnection);
      server.start();

      Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ111001"));
      Assert.assertEquals(6, AssertionLoggerHandler.countText("AMQ119018")); // 0..5 = 6
   }

   /**
    * Refuse the first mirror link, verify broker handles it and reconnects
    *
    * @throws Exception
    */
   @Test
   public void testReconnectAfterMirrorLinkRefusal() throws Exception {
      startVerx();

      AtomicInteger errors = new AtomicInteger(0);

      AtomicInteger messagesReceived = new AtomicInteger(0);

      List<ProtonConnection> connections = Collections.synchronizedList(new ArrayList<ProtonConnection>());
      List<ProtonConnection> disconnected = Collections.synchronizedList(new ArrayList<ProtonConnection>());
      AtomicInteger refusedLinkMessageCount = new AtomicInteger();
      AtomicInteger linkOpens = new AtomicInteger(0);

      mockServer = new MockServer(vertx, serverConnection -> {
         serverConnection.disconnectHandler(c -> {
            disconnected.add(serverConnection);
         });

         serverConnection.openHandler(c -> {
            connections.add(serverConnection);
            serverConnection.open();
         });

         serverConnection.closeHandler(c -> {
            serverConnection.close();
            connections.remove(serverConnection);
         });

         serverConnection.sessionOpenHandler(session -> {
            session.open();
         });

         serverConnection.receiverOpenHandler(serverReceiver -> {
            Target remoteTarget = serverReceiver.getRemoteTarget();
            String remoteAddress = remoteTarget == null ? null : remoteTarget.getAddress();
            if (remoteAddress == null || !remoteAddress.startsWith(ProtonProtocolManager.MIRROR_ADDRESS)) {
               errors.incrementAndGet();
               logger.warn("Receiving address as " + remoteAddress);
               return;
            }
            if (linkOpens.incrementAndGet() != 2) {
               logger.debug("Link Opens::" + linkOpens);
               logger.debug("ServerReceiver = " + serverReceiver.getTarget());
               serverReceiver.setTarget(null);

               serverReceiver.handler((del, msg) -> {
                  refusedLinkMessageCount.incrementAndGet();
                  logger.debug("Should not have got message on refused link: " + msg);
               });

               serverReceiver.open();

               vertx.setTimer(20, x -> {
                  serverReceiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, "Testing refusal of mirror link for $reasons"));
                  serverReceiver.close();
               });
            } else {
               serverReceiver.setTarget(serverReceiver.getRemoteTarget());
               HashMap<Symbol, Object> linkProperties = new HashMap<>();
               linkProperties.put(AMQPMirrorControllerSource.BROKER_ID, "fake-id");

               serverReceiver.setProperties(linkProperties);
               serverReceiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY});

               serverReceiver.handler((del, msg) -> {
                  logger.debug("prefetch = " + serverReceiver.getPrefetch() + ", Got message: " + msg);
                  if (msg.getApplicationProperties() != null) {
                     Map map = msg.getApplicationProperties().getValue();
                     Object value = map.get("sender");
                     if (value != null) {
                        if (messagesReceived.get() != ((Integer) value).intValue()) {
                           logger.warn("Message out of order. Expected " + messagesReceived.get() + " but received " + value);
                           errors.incrementAndGet();
                        }
                        messagesReceived.incrementAndGet();
                     }
                  }
                  del.disposition(Accepted.getInstance(), true);
                  if (serverReceiver.getPrefetch() == 0) {
                     serverReceiver.flow(1);
                  }
               });

               serverReceiver.open();
            }
         });
      });

      AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort()).setReconnectAttempts(3).setRetryInterval(10);
      amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement());
      server.getConfiguration().addAMQPConnection(amqpConnection);
      server.start();

      Wait.assertEquals(1, disconnected::size, 6000);
      Wait.assertEquals(2, connections::size, 6000);

      assertSame(connections.get(0), disconnected.get(0));
      assertFalse(connections.get(1).isDisconnected());

      assertEquals("Should not have got any message on refused link", 0, refusedLinkMessageCount.get());
      assertEquals(0, errors.get());

      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
      try (Connection connection = factory.createConnection()) {
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
         for (int i = 0; i < 100; i++) {
            TextMessage message = session.createTextMessage("hello");
            message.setIntProperty("sender", i);
            producer.send(message);
         }
      }

      Wait.assertEquals(100, messagesReceived::intValue);
      assertEquals(0, errors.get()); // Meant to check again. the errors check before was because of connection issues. This one is about duplicates on receiving
   }

   @Test
   public void testNoClientDesiredMirrorCapability() throws Exception {
      AssertionLoggerHandler.startCapture();
      server.start();

      AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null);
      client.setValidator(new AmqpValidator() {

         @Override
         public void inspectOpenedResource(Sender sender) {
            ErrorCondition condition = sender.getRemoteCondition();

            if (condition != null && condition.getCondition() != null) {
               if (!condition.getCondition().equals(AmqpError.ILLEGAL_STATE)) {
                  markAsInvalid("Should have been closed with an illegal state error, but error was: " + condition);
               }

               if (!condition.getDescription().contains("AMQ119024")) {
                  markAsInvalid("should have indicated the error code about missing a desired capability");
               }

               if (!condition.getDescription().contains(AMQPMirrorControllerSource.MIRROR_CAPABILITY)) {
                  markAsInvalid("should have indicated the error code about missing a desired capability");
               }
            } else {
               markAsInvalid("Sender should have been detached with an error");
            }
         }
      });

      String address = ProtonProtocolManager.getMirrorAddress(getTestName());

      AmqpConnection connection = client.connect();
      try {
         AmqpSession session = connection.createSession();

         try {
            session.createSender(address);
            fail("Link should have been refused.");
         } catch (Exception ex) {
            Assert.assertTrue(ex.getMessage().contains("AMQ119024"));
            instanceLog.debug("Caught expected exception");
         }

         connection.getStateInspector().assertValid();
      } finally {
         connection.close();
      }

      Wait.assertTrue(() -> AssertionLoggerHandler.findText("AMQ119024"));
   }
}
