QPID-8545: [Broker-J] SSL Engine looping circuit breaker

This closes #98
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index 6bfcc38..7fcb110 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -56,6 +56,10 @@
     String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors";
     String PORT_AMQP_ACCEPT_BACKLOG = "qpid.port.amqp.acceptBacklog";
 
+    String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING = "qpid.port.amqp.diagnosisOfSslEngineLooping";
+    String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD = "qpid.port.amqp.diagnosisOfSslEngineLoopingWarnThreshold";
+    String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD = "qpid.port.amqp.diagnosisOfSslEngineLoopingBreakThreshold";
+
     @ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
     String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
 
@@ -64,6 +68,18 @@
     @ManagedContextDefault(name = PORT_MAX_OPEN_CONNECTIONS)
     int DEFAULT_MAX_OPEN_CONNECTIONS = -1;
 
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING)
+    boolean DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING = false;
+
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD)
+    long DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD = 1000;
+
+    @SuppressWarnings("unused")
+    @ManagedContextDefault( name = PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD)
+    long DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD = 1005;
+
     String PORT_IGNORE_INVALID_SNI = "qpid.port.amqp.ignoreInvalidSni";
 
     @SuppressWarnings("unused")
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index 843e3d7..1bc1943 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -691,4 +691,5 @@
     {
         return _selectedHost;
     }
+
 }
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
index 869775e..9255216 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
@@ -28,6 +28,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SNIHostName;
 import javax.net.ssl.SSLEngine;
@@ -63,7 +64,10 @@
     private QpidByteBuffer _netOutputBuffer;
     private QpidByteBuffer _applicationBuffer;
     private final boolean _ignoreInvalidSni;
-
+    private final AtomicInteger _loopingCounter = new AtomicInteger(0);
+    private final boolean _enableDiagnosisOfSslEngineLooping;
+    private final long _diagnosisOfSslEngineLoopingWarnThreshold;
+    private final long _diagnosisOfSslEngineLoopingBreakThreshold;
 
     public NonBlockingConnectionTLSDelegate(NonBlockingConnection parent, AmqpPort port)
     {
@@ -82,6 +86,9 @@
         _applicationBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize);
         _netOutputBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize);
         _ignoreInvalidSni = port.isIgnoreInvalidSni();
+        _enableDiagnosisOfSslEngineLooping = port.getContextValue(Boolean.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING);
+        _diagnosisOfSslEngineLoopingWarnThreshold = port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD);
+        _diagnosisOfSslEngineLoopingBreakThreshold = port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD);
     }
 
     @Override
@@ -300,7 +307,27 @@
                     _encryptedOutput.add(_netOutputBuffer);
                     _netOutputBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize);
                 }
-
+                // SSLEngine looping circuit breaker
+                if (_enableDiagnosisOfSslEngineLooping)
+                {
+                    _loopingCounter.incrementAndGet();
+                    if (_loopingCounter.get() > _diagnosisOfSslEngineLoopingWarnThreshold)
+                    {
+                        LOGGER.warn("SSLEngine looping detected, _status: {}, _sslEngine.isOutboundDone(): {}, _sslEngine.isInboundDone(): {}, "
+                                        + "_sslEngine.getPeerHost(): {}, _sslEngine.getPeerPort(): {}",
+                                "[ Status = " + _status.getStatus() + ", HandshakeStatus = " + _status.getHandshakeStatus()
+                                        + ", bytesConsumed = " + _status.bytesConsumed() + ", bytesProduced = " + _status.bytesProduced() + " ]",
+                                _sslEngine.isOutboundDone(),
+                                _sslEngine.isInboundDone(),
+                                _sslEngine.getPeerHost(),
+                                _sslEngine.getPeerPort()
+                        );
+                    }
+                    if (_loopingCounter.get() > _diagnosisOfSslEngineLoopingBreakThreshold)
+                    {
+                        throw new SSLException("SSLEngine looping detected, executing circuit breaker");
+                    }
+                }
             }
             else
             {
@@ -310,6 +337,11 @@
         }
         while(encrypted && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
 
+        if (_enableDiagnosisOfSslEngineLooping && encrypted)
+        {
+            _loopingCounter.set(0);
+        }
+
         if(_netOutputBuffer.position() != 0)
         {
             final QpidByteBuffer outputBuffer = _netOutputBuffer;
diff --git a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
index 51902ef..4cc90e5 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
@@ -272,6 +272,9 @@
         when(port.getContextValue(Integer.class, AmqpPort.PORT_AMQP_ACCEPT_BACKLOG))
                 .thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
         when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT);
+        when(port.getContextValue(Boolean.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING)).thenReturn(false);
+        when(port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD)).thenReturn(1000);
+        when(port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD)).thenReturn(1005);
         ObjectMapper mapper = new ObjectMapper();
         JavaType type = mapper.getTypeFactory().constructCollectionType(List.class, String.class);
         List<String> allowList = mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_ALLOW_LIST, type);