QPID-8545: [Broker-J] SSL Engine looping circuit breaker
This closes #98
diff --git a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
index 6bfcc38..7fcb110 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
@@ -56,6 +56,10 @@
String PORT_AMQP_NUMBER_OF_SELECTORS = "qpid.port.amqp.threadPool.numberOfSelectors";
String PORT_AMQP_ACCEPT_BACKLOG = "qpid.port.amqp.acceptBacklog";
+ String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING = "qpid.port.amqp.diagnosisOfSslEngineLooping";
+ String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD = "qpid.port.amqp.diagnosisOfSslEngineLoopingWarnThreshold";
+ String PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD = "qpid.port.amqp.diagnosisOfSslEngineLoopingBreakThreshold";
+
@ManagedContextDefault(name = DEFAULT_AMQP_PROTOCOLS)
String INSTALLED_PROTOCOLS = AmqpPortImpl.getInstalledProtocolsAsString();
@@ -64,6 +68,18 @@
@ManagedContextDefault(name = PORT_MAX_OPEN_CONNECTIONS)
int DEFAULT_MAX_OPEN_CONNECTIONS = -1;
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING)
+ boolean DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING = false;
+
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD)
+ long DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD = 1000;
+
+ @SuppressWarnings("unused")
+ @ManagedContextDefault( name = PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD)
+ long DEFAULT_PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD = 1005;
+
String PORT_IGNORE_INVALID_SNI = "qpid.port.amqp.ignoreInvalidSni";
@SuppressWarnings("unused")
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
index 843e3d7..1bc1943 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
@@ -691,4 +691,5 @@
{
return _selectedHost;
}
+
}
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
index 869775e..9255216 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnectionTLSDelegate.java
@@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SSLEngine;
@@ -63,7 +64,10 @@
private QpidByteBuffer _netOutputBuffer;
private QpidByteBuffer _applicationBuffer;
private final boolean _ignoreInvalidSni;
-
+ private final AtomicInteger _loopingCounter = new AtomicInteger(0);
+ private final boolean _enableDiagnosisOfSslEngineLooping;
+ private final long _diagnosisOfSslEngineLoopingWarnThreshold;
+ private final long _diagnosisOfSslEngineLoopingBreakThreshold;
public NonBlockingConnectionTLSDelegate(NonBlockingConnection parent, AmqpPort port)
{
@@ -82,6 +86,9 @@
_applicationBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize);
_netOutputBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize);
_ignoreInvalidSni = port.isIgnoreInvalidSni();
+ _enableDiagnosisOfSslEngineLooping = port.getContextValue(Boolean.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING);
+ _diagnosisOfSslEngineLoopingWarnThreshold = port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD);
+ _diagnosisOfSslEngineLoopingBreakThreshold = port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD);
}
@Override
@@ -300,7 +307,27 @@
_encryptedOutput.add(_netOutputBuffer);
_netOutputBuffer = QpidByteBuffer.allocateDirect(_networkBufferSize);
}
-
+ // SSLEngine looping circuit breaker
+ if (_enableDiagnosisOfSslEngineLooping)
+ {
+ _loopingCounter.incrementAndGet();
+ if (_loopingCounter.get() > _diagnosisOfSslEngineLoopingWarnThreshold)
+ {
+ LOGGER.warn("SSLEngine looping detected, _status: {}, _sslEngine.isOutboundDone(): {}, _sslEngine.isInboundDone(): {}, "
+ + "_sslEngine.getPeerHost(): {}, _sslEngine.getPeerPort(): {}",
+ "[ Status = " + _status.getStatus() + ", HandshakeStatus = " + _status.getHandshakeStatus()
+ + ", bytesConsumed = " + _status.bytesConsumed() + ", bytesProduced = " + _status.bytesProduced() + " ]",
+ _sslEngine.isOutboundDone(),
+ _sslEngine.isInboundDone(),
+ _sslEngine.getPeerHost(),
+ _sslEngine.getPeerPort()
+ );
+ }
+ if (_loopingCounter.get() > _diagnosisOfSslEngineLoopingBreakThreshold)
+ {
+ throw new SSLException("SSLEngine looping detected, executing circuit breaker");
+ }
+ }
}
else
{
@@ -310,6 +337,11 @@
}
while(encrypted && _sslEngine.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.NEED_UNWRAP);
+ if (_enableDiagnosisOfSslEngineLooping && encrypted)
+ {
+ _loopingCounter.set(0);
+ }
+
if(_netOutputBuffer.position() != 0)
{
final QpidByteBuffer outputBuffer = _netOutputBuffer;
diff --git a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
index 51902ef..4cc90e5 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
@@ -272,6 +272,9 @@
when(port.getContextValue(Integer.class, AmqpPort.PORT_AMQP_ACCEPT_BACKLOG))
.thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT);
+ when(port.getContextValue(Boolean.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING)).thenReturn(false);
+ when(port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_WARN_THRESHOLD)).thenReturn(1000);
+ when(port.getContextValue(Integer.class, AmqpPort.PORT_DIAGNOSIS_OF_SSL_ENGINE_LOOPING_BREAK_THRESHOLD)).thenReturn(1005);
ObjectMapper mapper = new ObjectMapper();
JavaType type = mapper.getTypeFactory().constructCollectionType(List.class, String.class);
List<String> allowList = mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_ALLOW_LIST, type);