PROTON-1998: add trace output for the AMQP and SASL headers being sent+received
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
index e3b3c55..ce4283e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
@@ -40,6 +40,7 @@
class FrameParser implements TransportInput
{
private static final Logger TRACE_LOGGER = Logger.getLogger("proton.trace");
+ private static final String HEADER_DESCRIPTION = "AMQP";
private static final ByteBuffer _emptyInputBuffer = newWriteableBuffer(0);
@@ -67,6 +68,7 @@
private final ByteBufferDecoder _decoder;
private final int _inputBufferSize;
private final int _localMaxFrameSize;
+ private final TransportImpl _transport;
private ByteBuffer _inputBuffer = null;
private boolean _tail_closed = false;
@@ -89,12 +91,13 @@
* We store the last result when processing input so that
* we know not to process any more input if it was an error.
*/
- FrameParser(FrameHandler frameHandler, ByteBufferDecoder decoder, int localMaxFrameSize)
+ FrameParser(FrameHandler frameHandler, ByteBufferDecoder decoder, int localMaxFrameSize, TransportImpl transport)
{
_frameHandler = frameHandler;
_decoder = decoder;
_localMaxFrameSize = localMaxFrameSize;
_inputBufferSize = _localMaxFrameSize > 0 ? _localMaxFrameSize : 16*1024;
+ _transport = transport;
}
private void input(ByteBuffer in) throws TransportException
@@ -238,6 +241,9 @@
state = State.ERROR;
break;
}
+
+ logHeader();
+
state = State.SIZE_0;
}
else
@@ -583,4 +589,15 @@
{
return _framesInput;
}
+
+ private void logHeader() {
+ if (_transport.isFrameTracingEnabled()) {
+ _transport.log(TransportImpl.INCOMING, HEADER_DESCRIPTION);
+
+ ProtocolTracer tracer = _transport.getProtocolTracer();
+ if (tracer != null) {
+ tracer.receivedHeader(HEADER_DESCRIPTION);
+ }
+ }
+ }
}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
index ff1468c..0b92884 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
@@ -30,6 +30,10 @@
{
public void receivedFrame(TransportFrame transportFrame);
public void sentFrame(TransportFrame transportFrame);
+
default void receivedSaslBody(SaslFrameBody saslFrameBody) {}
default void sentSaslBody(SaslFrameBody saslFrameBody) {}
+
+ default void receivedHeader(String header) {}
+ default void sentHeader(String header) {}
}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
index a6f75d5..141ec31 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
@@ -33,6 +33,8 @@
class SaslFrameParser
{
+ private static final String HEADER_DESCRIPTION = "SASL";
+
private SaslFrameHandler _sasl;
enum State
@@ -62,12 +64,14 @@
private final ByteBufferDecoder _decoder;
private int _frameSizeLimit;
+ private TransportImpl _transport;
- SaslFrameParser(SaslFrameHandler sasl, ByteBufferDecoder decoder, int frameSizeLimit)
+ SaslFrameParser(SaslFrameHandler sasl, ByteBufferDecoder decoder, int frameSizeLimit, TransportImpl transport)
{
_sasl = sasl;
_decoder = decoder;
_frameSizeLimit = frameSizeLimit;
+ _transport = transport;
}
/**
@@ -206,6 +210,9 @@
state = State.ERROR;
break;
}
+
+ logHeader();
+
state = State.SIZE_0;
}
else
@@ -413,4 +420,18 @@
_size = 0;
_state = State.SIZE_0;
}
+
+ private void logHeader()
+ {
+ if (_transport.isFrameTracingEnabled())
+ {
+ _transport.log(TransportImpl.INCOMING, HEADER_DESCRIPTION);
+
+ ProtocolTracer tracer = _transport.getProtocolTracer();
+ if (tracer != null)
+ {
+ tracer.receivedHeader(HEADER_DESCRIPTION);
+ }
+ }
+ }
}
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
index 7c17e20..eea4053 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
@@ -51,6 +51,7 @@
private static final Logger _logger = Logger.getLogger(SaslImpl.class.getName());
public static final byte SASL_FRAME_TYPE = (byte) 1;
+ private static final String HEADER_DESCRIPTION = "SASL";
private final DecoderImpl _decoder = new DecoderImpl();
private final EncoderImpl _encoder = new EncoderImpl(_decoder);
@@ -98,7 +99,7 @@
_maxFrameSize = maxFrameSize;
AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
- _frameParser = new SaslFrameParser(this, _decoder, maxFrameSize);
+ _frameParser = new SaslFrameParser(this, _decoder, maxFrameSize, _transport);
_frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, _transport);
}
@@ -212,6 +213,8 @@
{
if(!_headerWritten)
{
+ logHeader();
+
_frameWriter.writeHeader(AmqpHeader.SASL_HEADER);
_headerWritten = true;
return AmqpHeader.SASL_HEADER.length;
@@ -222,6 +225,20 @@
}
}
+ private void logHeader()
+ {
+ if (_transport.isFrameTracingEnabled())
+ {
+ _transport.log(TransportImpl.OUTGOING, HEADER_DESCRIPTION);
+
+ ProtocolTracer tracer = _transport.getProtocolTracer();
+ if (tracer != null)
+ {
+ tracer.sentHeader(HEADER_DESCRIPTION);
+ }
+ }
+ }
+
@Override
public int pending()
{
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 30e5a2a..d8bac2d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -82,6 +82,7 @@
private static final boolean FRM_ENABLED = getBooleanEnv("PN_TRACE_FRM");
private static final int TRACE_FRAME_PAYLOAD_LENGTH = Integer.getInteger("proton.trace_frame_payload_length", 1024);
+ private static final String HEADER_DESCRIPTION = "AMQP";
// trace levels
private int _levels = (FRM_ENABLED ? TRACE_FRM : 0);
@@ -179,7 +180,7 @@
if(!_init)
{
_init = true;
- _frameParser = new FrameParser(_frameHandler , _decoder, _maxFrameSize);
+ _frameParser = new FrameParser(_frameHandler , _decoder, _maxFrameSize, this);
_inputProcessor = _frameParser;
_outputProcessor = new TransportOutputAdaptor(this, _maxFrameSize, isUseReadOnlyOutputBuffer());
}
@@ -865,11 +866,27 @@
{
if(!_headerWritten)
{
+ outputHeaderDescription();
+
_frameWriter.writeHeader(AmqpHeader.HEADER);
_headerWritten = true;
}
}
+ private void outputHeaderDescription()
+ {
+ if (isFrameTracingEnabled())
+ {
+ log(TransportImpl.OUTGOING, HEADER_DESCRIPTION);
+
+ ProtocolTracer tracer = getProtocolTracer();
+ if (tracer != null)
+ {
+ tracer.sentHeader(HEADER_DESCRIPTION);
+ }
+ }
+ }
+
private void processOpen()
{
if (!_isOpenSent && (_conditionSet ||
@@ -1726,6 +1743,12 @@
}
}
+ void log(final String event, final String headerDescription) {
+ if (isTraceFramesEnabled()) {
+ outputMessage(event, 0, headerDescription, null);
+ }
+ }
+
private void outputMessage(String event, int channel, Object frameBody, Binary payload) {
StringBuilder msg = new StringBuilder();
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
index 148c096..3b0e36b 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
@@ -52,7 +52,7 @@
private FrameHandler _mockFrameHandler = mock(FrameHandler.class);
private DecoderImpl _decoder = new DecoderImpl();
private EncoderImpl _encoder = new EncoderImpl(_decoder);
- private final FrameParser _frameParser = new FrameParser(_mockFrameHandler, _decoder, DEFAULT_MAX_FRAME_SIZE);
+ private final FrameParser _frameParser = new FrameParser(_mockFrameHandler, _decoder, DEFAULT_MAX_FRAME_SIZE, new TransportImpl());
private final AmqpFramer _amqpFramer = new AmqpFramer();
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java
index e766572..8e8a4dc 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java
@@ -47,8 +47,9 @@
{
private final SaslFrameHandler _mockSaslFrameHandler = mock(SaslFrameHandler.class);
private final ByteBufferDecoder _mockDecoder = mock(ByteBufferDecoder.class);
+ private final TransportImpl mockTransport = mock(TransportImpl.class);
private final SaslFrameParser _frameParser;
- private final SaslFrameParser _frameParserWithMockDecoder = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, Transport.MIN_MAX_FRAME_SIZE);
+ private final SaslFrameParser _frameParserWithMockDecoder = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, Transport.MIN_MAX_FRAME_SIZE, mockTransport);
private final AmqpFramer _amqpFramer = new AmqpFramer();
private final SaslInit _saslFrameBody;
@@ -60,7 +61,7 @@
EncoderImpl encoder = new EncoderImpl(decoder);
AMQPDefinedTypes.registerAllTypes(decoder,encoder);
- _frameParser = new SaslFrameParser(_mockSaslFrameHandler, decoder, Transport.MIN_MAX_FRAME_SIZE);
+ _frameParser = new SaslFrameParser(_mockSaslFrameHandler, decoder, Transport.MIN_MAX_FRAME_SIZE, mockTransport);
_saslFrameBody = new SaslInit();
_saslFrameBody.setMechanism(Symbol.getSymbol("unused"));
_saslFrameBytes = ByteBuffer.wrap(_amqpFramer.generateSaslFrame(0, new byte[0], _saslFrameBody));
@@ -106,7 +107,7 @@
@Test
public void testInputOfFrameWithInvalidSizeWhenSpecifyingLargeMaxFrameSize()
{
- SaslFrameParser frameParserWithLargeMaxSize = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, 2017);
+ SaslFrameParser frameParserWithLargeMaxSize = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, 2017, mockTransport);
sendAmqpSaslHeader(frameParserWithLargeMaxSize);
// http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-security-v1.0-os.html#doc-idp43536
@@ -248,8 +249,9 @@
private void doInputOfInvalidHeaderTestImpl(int invalidIndex) {
SaslFrameHandler mockSaslFrameHandler = mock(SaslFrameHandler.class);
ByteBufferDecoder mockDecoder = mock(ByteBufferDecoder.class);
+ TransportImpl mockTransport = mock(TransportImpl.class);
- SaslFrameParser saslFrameParser = new SaslFrameParser(mockSaslFrameHandler, mockDecoder, Transport.MIN_MAX_FRAME_SIZE);
+ SaslFrameParser saslFrameParser = new SaslFrameParser(mockSaslFrameHandler, mockDecoder, Transport.MIN_MAX_FRAME_SIZE, mockTransport);
byte[] header = Arrays.copyOf(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER.length);
header[invalidIndex] = 'X';
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 3f0da6c..8b81355 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -39,6 +39,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
@@ -3643,7 +3644,7 @@
}
@Test
- public void testProtocolTracingLogsToTracer()
+ public void testProtocolTracingLogsFrameToTracer()
{
Connection connection = new ConnectionImpl();
List<TransportFrame> frames = new ArrayList<>();
@@ -3673,7 +3674,8 @@
}
@Test
- public void testProtocolTracingLogsToSystem() {
+ public void testProtocolTracingLogsFrameToSystem()
+ {
Connection connection = new ConnectionImpl();
TransportImpl spy = spy(_transport);
@@ -3691,4 +3693,185 @@
assertTrue(frameCatcher.getValue().getBody() instanceof Open);
assertNull(frameCatcher.getValue().getPayload());
}
+
+ @Test
+ public void testProtocolTracingLogsHeaderToTracer()
+ {
+ doProtocolTracingLogsHeaderToTracerTestImpl(false);
+ }
+
+ @Test
+ public void testProtocolTracingLogsHeaderSaslToTracer()
+ {
+ doProtocolTracingLogsHeaderToTracerTestImpl(true);
+ }
+
+ private void doProtocolTracingLogsHeaderToTracerTestImpl(boolean sasl)
+ {
+ Connection connection = new ConnectionImpl();
+ AtomicReference<String> headerRef = new AtomicReference<>();
+ _transport.setProtocolTracer(new ProtocolTracer()
+ {
+ @Override
+ public void receivedHeader(String header)
+ {
+ assertTrue(headerRef.compareAndSet(null, header));
+ }
+
+ @Override
+ public void receivedFrame(TransportFrame transportFrame) { }
+ @Override
+ public void sentFrame(TransportFrame transportFrame) { }
+
+ });
+
+ if (sasl)
+ {
+ _transport.sasl();
+ }
+
+ assertTrue(_transport.isHandlingFrames());
+ _transport.bind(connection);
+
+ assertTrue(_transport.isHandlingFrames());
+ _transport.getInputBuffer().put(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER);
+ _transport.process();
+ assertTrue(_transport.isHandlingFrames());
+
+ assertNotNull(headerRef.get());
+ assertEquals(sasl ? "SASL" : "AMQP", headerRef.get());
+ }
+
+ @Test
+ public void testProtocolTracingLogsHeaderToSystem()
+ {
+ doProtocolTracingLogsHeaderToSystemTestImpl(false);
+ }
+
+ @Test
+ public void testProtocolTracingLogsHeaderSaslToSystem()
+ {
+ doProtocolTracingLogsHeaderToSystemTestImpl(true);
+ }
+
+ private void doProtocolTracingLogsHeaderToSystemTestImpl(boolean sasl)
+ {
+ Connection connection = new ConnectionImpl();
+
+ AtomicReference<String> headerRef = new AtomicReference<>();
+ AtomicReference<String> eventRef = new AtomicReference<>();
+ TransportImpl transport = new TransportImpl()
+ {
+ @Override
+ public void log(String event, String header) {
+ assertTrue(eventRef.compareAndSet(null, event));
+ assertTrue(headerRef.compareAndSet(null, header));
+ }
+ };
+ transport.trace(2);
+
+ if (sasl)
+ {
+ transport.sasl();
+ }
+
+ transport.bind(connection);
+
+ transport.getInputBuffer().put(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER);
+ transport.process();
+
+ assertEquals(TransportImpl.INCOMING, eventRef.get());
+ assertEquals(sasl ? "SASL" : "AMQP", headerRef.get());
+ }
+
+ @Test
+ public void testProtocolTracingLogsOutboundHeaderToTracer()
+ {
+ doProtocolTracingLogsOutboundHeaderToTracerTestImpl(false);
+ }
+
+ @Test
+ public void testProtocolTracingLogsOutboundHeaderSaslToTracer()
+ {
+ doProtocolTracingLogsOutboundHeaderToTracerTestImpl(true);
+ }
+
+ private void doProtocolTracingLogsOutboundHeaderToTracerTestImpl(boolean sasl)
+ {
+ Connection connection = new ConnectionImpl();
+ AtomicReference<String> headerRef = new AtomicReference<>();
+ _transport.setProtocolTracer(new ProtocolTracer()
+ {
+ @Override
+ public void sentHeader(String header)
+ {
+ assertTrue(headerRef.compareAndSet(null, header));
+ }
+
+ @Override
+ public void receivedFrame(TransportFrame transportFrame) { }
+ @Override
+ public void sentFrame(TransportFrame transportFrame) { }
+
+ });
+
+ if (sasl)
+ {
+ _transport.sasl();
+ }
+
+ _transport.bind(connection);
+
+ ByteBuffer expected = ByteBuffer.wrap(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER);
+
+ _transport.pending();
+ assertEquals(expected, _transport.getOutputBuffer());
+
+ assertEquals(sasl ? "SASL" : "AMQP", headerRef.get());
+ }
+
+ @Test
+ public void testProtocolTracingLogsOutboundHeaderToSystem()
+ {
+ doProtocolTracingLogsOutboundHeaderToSystemTestImpl(false);
+ }
+
+ @Test
+ public void testProtocolTracingLogsOutboundHeaderSaslToSystem()
+ {
+ doProtocolTracingLogsOutboundHeaderToSystemTestImpl(true);
+ }
+
+ private void doProtocolTracingLogsOutboundHeaderToSystemTestImpl(boolean sasl)
+ {
+ Connection connection = new ConnectionImpl();
+
+ AtomicReference<String> headerRef = new AtomicReference<>();
+ AtomicReference<String> eventRef = new AtomicReference<>();
+ TransportImpl transport = new TransportImpl()
+ {
+ @Override
+ public void log(String event, String header)
+ {
+ assertTrue(eventRef.compareAndSet(null, event));
+ assertTrue(headerRef.compareAndSet(null, header));
+ }
+ };
+ transport.trace(2);
+
+ if (sasl)
+ {
+ transport.sasl();
+ }
+
+ transport.bind(connection);
+
+ ByteBuffer expected = ByteBuffer.wrap(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER);
+
+ transport.pending();
+ assertEquals(expected, transport.getOutputBuffer());
+
+ assertEquals(TransportImpl.OUTGOING, eventRef.get());
+ assertEquals(sasl ? "SASL" : "AMQP", headerRef.get());
+ }
}