QPID-6336: [Java Broker] Change 0-8..0-91 path to await the receiver being closed before continuing with the next connection attempt

Based on work by Oleksandr Rudyy <orudyy@gmail.com>

git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1655034 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
index 9455d78..97d380b 100644
--- a/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
+++ b/qpid/java/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/GroupCreator.java
@@ -73,9 +73,9 @@
     private static final String MANY_BROKER_URL_FORMAT = "amqp://guest:guest@/%s?brokerlist='%s'&failover='roundrobin?cyclecount='%d''";
     private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
 
-    private static final int FAILOVER_CYCLECOUNT = 20;
+    private static final int FAILOVER_CYCLECOUNT = 40;
     private static final int FAILOVER_RETRIES = 0;
-    private static final int FAILOVER_CONNECTDELAY = 500;
+    private static final int FAILOVER_CONNECTDELAY = 250;
 
     private static final String SINGLE_BROKER_URL_WITH_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d?connectdelay='%d',retries='%d''";
     private static final String SINGLE_BROKER_URL_WITHOUT_RETRY_FORMAT = "amqp://guest:guest@/%s?brokerlist='tcp://localhost:%d'";
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
index 66cade1..35582d9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
@@ -21,16 +21,20 @@
 package org.apache.qpid.client;
 
 import java.net.ConnectException;
+import java.nio.ByteBuffer;
 import java.nio.channels.UnresolvedAddressException;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.JMSException;
 import javax.jms.XASession;
 
+import org.apache.qpid.transport.Receiver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,6 +71,9 @@
 {
     private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
     private final AMQConnection _conn;
+    private final long _timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
+                                               Long.getLong(ClientProperties.AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
+                                                            ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
     private boolean _messageCompressionSupported;
     private boolean _addrSyntaxSupported;
     private boolean _confirmedPublishSupported;
@@ -136,7 +143,9 @@
 
         OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(getProtocolVersion());
 
-        NetworkConnection network = transport.connect(settings, securityLayer.receiver(_conn.getProtocolHandler()),
+        ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler()));
+
+        NetworkConnection network = transport.connect(settings, monitoringReceiver,
                                                       _conn.getProtocolHandler());
 
         try
@@ -171,6 +180,19 @@
             network.close();
             throw e;
         }
+        finally
+        {
+            // await the receiver to finish its execution (and so the IO threads too)
+            if (!_conn.isConnected())
+            {
+                boolean closedWithinTimeout = monitoringReceiver.awaitClose(_timeout);
+                if (!closedWithinTimeout)
+                {
+                    _logger.warn("Timed-out waiting for receiver for connection to "
+                                 + brokerDetail + " to be closed.");
+                }
+            }
+        }
 
     }
 
@@ -503,4 +525,60 @@
     {
         return _confirmedPublishNonTransactionalSupported;
     }
+
+
+    private static class ReceiverClosedWaiter implements Receiver<ByteBuffer>
+    {
+        private final CountDownLatch _closedWatcher;
+        private final Receiver<ByteBuffer> _receiver;
+
+        public ReceiverClosedWaiter(Receiver<ByteBuffer> receiver)
+        {
+            _receiver = receiver;
+            _closedWatcher = new CountDownLatch(1);
+        }
+
+        @Override
+        public void received(ByteBuffer msg)
+        {
+            _receiver.received(msg);
+        }
+
+        @Override
+        public void exception(Throwable t)
+        {
+            _receiver.exception(t);
+        }
+
+        @Override
+        public void closed()
+        {
+            try
+            {
+                _receiver.closed();
+            }
+            finally
+            {
+                _closedWatcher.countDown();
+            }
+        }
+
+        public boolean awaitClose(long timeout)
+        {
+            try
+            {
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Waiting " + timeout + "ms for receiver to be closed");
+                }
+
+                return _closedWatcher.await(timeout, TimeUnit.MILLISECONDS);
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+                return _closedWatcher.getCount() == 0;
+            }
+        }
+    };
 }
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
index c614695..c2582ac 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
@@ -240,7 +240,7 @@
                     }
                     catch (Exception e)
                     {
-                        _logger.warn("Exception occured on closing the sender", e);
+                        _logger.warn("Exception occurred on closing the sender", e);
                     }
                     if (_connection.failoverAllowed())
                     {
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
index 15ec0f9..c59b0ec 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/failover/MultipleBrokersFailoverTest.java
@@ -38,7 +38,6 @@
 import org.apache.qpid.client.AMQConnectionURL;
 import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
-import org.apache.qpid.test.utils.TestUtils;
 import org.apache.qpid.util.FileUtils;
 
 public class MultipleBrokersFailoverTest extends QpidBrokerTestCase implements ConnectionListener
@@ -48,9 +47,10 @@
     private static final String FAILOVER_VIRTUAL_HOST = "failover";
     private static final String NON_FAILOVER_VIRTUAL_HOST = "nonfailover";
     private static final String BROKER_PORTION_FORMAT = "tcp://localhost:%d?connectdelay='%d',retries='%d'";
-    private static final int FAILOVER_RETRIES = 1;
-    private static final int FAILOVER_CONNECTDELAY = 1000;
-    private static final int FAILOVER_FACTOR = 4;
+    private static final int FAILOVER_RETRIES = 0;
+    private static final int FAILOVER_CONNECTDELAY = 0;
+    private static final int FAILOVER_AWAIT_TIME = 10000;
+
 
     private int[] _brokerPorts;
     private AMQConnectionURL _connectionURL;
@@ -169,7 +169,7 @@
 
         killBroker(_brokerPorts[1]);
 
-        awaitForFailoverCompletion(FAILOVER_CONNECTDELAY * _brokerPorts.length * FAILOVER_FACTOR);
+        awaitForFailoverCompletion(FAILOVER_AWAIT_TIME);
         assertEquals("Failover is not started as expected", 0, _failoverStarted.getCount());
 
         assertSendReceive(2);
@@ -185,7 +185,7 @@
 
         stopBroker(_brokerPorts[1]);
 
-        awaitForFailoverCompletion(FAILOVER_CONNECTDELAY * _brokerPorts.length * FAILOVER_FACTOR);
+        awaitForFailoverCompletion(FAILOVER_AWAIT_TIME);
         assertEquals("Failover is not started as expected", 0, _failoverStarted.getCount());
 
         assertSendReceive(1);
@@ -214,20 +214,12 @@
         }
     }
 
-    private void awaitForFailoverCompletion(long delay)
+    private void awaitForFailoverCompletion(long delay) throws Exception
     {
         _logger.info("Awaiting Failover completion..");
-        try
+        if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
         {
-            if (!_failoverComplete.await(delay, TimeUnit.MILLISECONDS))
-            {
-                _logger.warn("Test thread stack:\n\n" + TestUtils.dumpThreads());
-                fail("Failover did not complete");
-            }
-        }
-        catch (InterruptedException e)
-        {
-            fail("Test was interrupted:" + e.getMessage());
+            fail("Failover did not complete within " + delay + "ms.");
         }
     }
 
@@ -239,7 +231,7 @@
                 receivedMessage instanceof TextMessage);
     }
 
-    private void init(int acknowledgeMode, boolean startConnection) throws JMSException
+    private void init(int acknowledgeMode, boolean startConnection) throws Exception
     {
         boolean isTransacted = acknowledgeMode == Session.SESSION_TRANSACTED ? true : false;
 
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
index 1dba5ce..7c82ea8 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
@@ -94,6 +94,29 @@
         }
     }
 
+    public void testSSLConnectionToPlainPortRejected() throws Exception
+    {
+        if (shouldPerformTest())
+        {
+            super.setUp();
+
+            String url = "amqp://guest:guest@test/?brokerlist='tcp://localhost:%s" +
+                         "?ssl='true''";
+
+            url = String.format(url,QpidBrokerTestCase.DEFAULT_PORT);
+
+            try
+            {
+                getConnection(new AMQConnectionURL(url));
+                fail("Exception not thrown");
+            }
+            catch (JMSException e)
+            {
+                assertTrue("Unexpected exception message", e.getMessage().contains("Unrecognized SSL message, plaintext connection?"));
+            }
+        }
+    }
+
     public void testHostVerificationIsOnByDefault() throws Exception
     {
         if (shouldPerformTest())
@@ -116,6 +139,7 @@
             try
             {
                 getConnection(new AMQConnectionURL(url));
+                fail("Exception not thrown");
             }
             catch(JMSException e)
             {
diff --git a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
index ed03e83..191f9d7 100644
--- a/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
+++ b/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
@@ -81,7 +81,6 @@
                                      + "&temporaryQueueExchange='tmp.direct'"
                                      + "&temporaryTopicExchange='tmp.topic'");
 
-            System.err.println(url.toString());
             conn = new AMQConnection(url);