PROTON-1998: add trace output for the AMQP and SASL headers being sent+received
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
index e3b3c55..ce4283e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java
@@ -40,6 +40,7 @@
 class FrameParser implements TransportInput
 {
     private static final Logger TRACE_LOGGER = Logger.getLogger("proton.trace");
+    private static final String HEADER_DESCRIPTION = "AMQP";
 
     private static final ByteBuffer _emptyInputBuffer = newWriteableBuffer(0);
 
@@ -67,6 +68,7 @@
     private final ByteBufferDecoder _decoder;
     private final int _inputBufferSize;
     private final int _localMaxFrameSize;
+    private final TransportImpl _transport;
 
     private ByteBuffer _inputBuffer = null;
     private boolean _tail_closed = false;
@@ -89,12 +91,13 @@
      * We store the last result when processing input so that
      * we know not to process any more input if it was an error.
      */
-    FrameParser(FrameHandler frameHandler, ByteBufferDecoder decoder, int localMaxFrameSize)
+    FrameParser(FrameHandler frameHandler, ByteBufferDecoder decoder, int localMaxFrameSize, TransportImpl transport)
     {
         _frameHandler = frameHandler;
         _decoder = decoder;
         _localMaxFrameSize = localMaxFrameSize;
         _inputBufferSize = _localMaxFrameSize > 0 ? _localMaxFrameSize : 16*1024;
+        _transport = transport;
     }
 
     private void input(ByteBuffer in) throws TransportException
@@ -238,6 +241,9 @@
                             state = State.ERROR;
                             break;
                         }
+
+                        logHeader();
+
                         state = State.SIZE_0;
                     }
                     else
@@ -583,4 +589,15 @@
     {
         return _framesInput;
     }
+
+    private void logHeader() {
+        if (_transport.isFrameTracingEnabled()) {
+            _transport.log(TransportImpl.INCOMING, HEADER_DESCRIPTION);
+
+            ProtocolTracer tracer = _transport.getProtocolTracer();
+            if (tracer != null) {
+                tracer.receivedHeader(HEADER_DESCRIPTION);
+            }
+        }
+    }
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
index ff1468c..0b92884 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ProtocolTracer.java
@@ -30,6 +30,10 @@
 {
     public void receivedFrame(TransportFrame transportFrame);
     public void sentFrame(TransportFrame transportFrame);
+
     default void receivedSaslBody(SaslFrameBody saslFrameBody) {}
     default void sentSaslBody(SaslFrameBody saslFrameBody) {}
+
+    default void receivedHeader(String header) {}
+    default void sentHeader(String header) {}
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
index a6f75d5..141ec31 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslFrameParser.java
@@ -33,6 +33,8 @@
 
 class SaslFrameParser
 {
+    private static final String HEADER_DESCRIPTION = "SASL";
+
     private SaslFrameHandler _sasl;
 
     enum State
@@ -62,12 +64,14 @@
 
     private final ByteBufferDecoder _decoder;
     private int _frameSizeLimit;
+    private TransportImpl _transport;
 
-    SaslFrameParser(SaslFrameHandler sasl, ByteBufferDecoder decoder, int frameSizeLimit)
+    SaslFrameParser(SaslFrameHandler sasl, ByteBufferDecoder decoder, int frameSizeLimit, TransportImpl transport)
     {
         _sasl = sasl;
         _decoder = decoder;
         _frameSizeLimit = frameSizeLimit;
+        _transport = transport;
     }
 
     /**
@@ -206,6 +210,9 @@
                             state = State.ERROR;
                             break;
                         }
+
+                        logHeader();
+
                         state = State.SIZE_0;
                     }
                     else
@@ -413,4 +420,18 @@
         _size = 0;
         _state = State.SIZE_0;
     }
+
+    private void logHeader()
+    {
+        if (_transport.isFrameTracingEnabled())
+        {
+            _transport.log(TransportImpl.INCOMING, HEADER_DESCRIPTION);
+
+            ProtocolTracer tracer = _transport.getProtocolTracer();
+            if (tracer != null)
+            {
+                tracer.receivedHeader(HEADER_DESCRIPTION);
+            }
+        }
+    }
 }
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
index 7c17e20..eea4053 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
@@ -51,6 +51,7 @@
     private static final Logger _logger = Logger.getLogger(SaslImpl.class.getName());
 
     public static final byte SASL_FRAME_TYPE = (byte) 1;
+    private static final String HEADER_DESCRIPTION = "SASL";
 
     private final DecoderImpl _decoder = new DecoderImpl();
     private final EncoderImpl _encoder = new EncoderImpl(_decoder);
@@ -98,7 +99,7 @@
         _maxFrameSize = maxFrameSize;
 
         AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
-        _frameParser = new SaslFrameParser(this, _decoder, maxFrameSize);
+        _frameParser = new SaslFrameParser(this, _decoder, maxFrameSize, _transport);
         _frameWriter = new FrameWriter(_encoder, maxFrameSize, FrameWriter.SASL_FRAME_TYPE, _transport);
     }
 
@@ -212,6 +213,8 @@
     {
         if(!_headerWritten)
         {
+            logHeader();
+
             _frameWriter.writeHeader(AmqpHeader.SASL_HEADER);
             _headerWritten = true;
             return AmqpHeader.SASL_HEADER.length;
@@ -222,6 +225,20 @@
         }
     }
 
+    private void logHeader()
+    {
+        if (_transport.isFrameTracingEnabled())
+        {
+            _transport.log(TransportImpl.OUTGOING, HEADER_DESCRIPTION);
+
+            ProtocolTracer tracer = _transport.getProtocolTracer();
+            if (tracer != null)
+            {
+                tracer.sentHeader(HEADER_DESCRIPTION);
+            }
+        }
+    }
+
     @Override
     public int pending()
     {
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 30e5a2a..d8bac2d 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -82,6 +82,7 @@
 
     private static final boolean FRM_ENABLED = getBooleanEnv("PN_TRACE_FRM");
     private static final int TRACE_FRAME_PAYLOAD_LENGTH = Integer.getInteger("proton.trace_frame_payload_length", 1024);
+    private static final String HEADER_DESCRIPTION = "AMQP";
 
     // trace levels
     private int _levels = (FRM_ENABLED ? TRACE_FRM : 0);
@@ -179,7 +180,7 @@
         if(!_init)
         {
             _init = true;
-            _frameParser = new FrameParser(_frameHandler , _decoder, _maxFrameSize);
+            _frameParser = new FrameParser(_frameHandler , _decoder, _maxFrameSize, this);
             _inputProcessor = _frameParser;
             _outputProcessor = new TransportOutputAdaptor(this, _maxFrameSize, isUseReadOnlyOutputBuffer());
         }
@@ -865,11 +866,27 @@
     {
         if(!_headerWritten)
         {
+            outputHeaderDescription();
+
             _frameWriter.writeHeader(AmqpHeader.HEADER);
             _headerWritten = true;
         }
     }
 
+    private void outputHeaderDescription()
+    {
+        if (isFrameTracingEnabled())
+        {
+            log(TransportImpl.OUTGOING, HEADER_DESCRIPTION);
+
+            ProtocolTracer tracer = getProtocolTracer();
+            if (tracer != null)
+            {
+                tracer.sentHeader(HEADER_DESCRIPTION);
+            }
+        }
+    }
+
     private void processOpen()
     {
         if (!_isOpenSent && (_conditionSet ||
@@ -1726,6 +1743,12 @@
         }
     }
 
+    void log(final String event, final String headerDescription) {
+        if (isTraceFramesEnabled()) {
+            outputMessage(event, 0, headerDescription, null);
+        }
+    }
+
     private void outputMessage(String event, int channel, Object frameBody, Binary payload) {
         StringBuilder msg = new StringBuilder();
 
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
index 148c096..3b0e36b 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java
@@ -52,7 +52,7 @@
     private FrameHandler _mockFrameHandler = mock(FrameHandler.class);
     private DecoderImpl _decoder = new DecoderImpl();
     private EncoderImpl _encoder = new EncoderImpl(_decoder);
-    private final FrameParser _frameParser = new FrameParser(_mockFrameHandler, _decoder, DEFAULT_MAX_FRAME_SIZE);
+    private final FrameParser _frameParser = new FrameParser(_mockFrameHandler, _decoder, DEFAULT_MAX_FRAME_SIZE, new TransportImpl());
 
     private final AmqpFramer _amqpFramer = new AmqpFramer();
 
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java
index e766572..8e8a4dc 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/SaslFrameParserTest.java
@@ -47,8 +47,9 @@
 {
     private final SaslFrameHandler _mockSaslFrameHandler = mock(SaslFrameHandler.class);
     private final ByteBufferDecoder _mockDecoder = mock(ByteBufferDecoder.class);
+    private final TransportImpl mockTransport = mock(TransportImpl.class);
     private final SaslFrameParser _frameParser;
-    private final SaslFrameParser _frameParserWithMockDecoder = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, Transport.MIN_MAX_FRAME_SIZE);
+    private final SaslFrameParser _frameParserWithMockDecoder = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, Transport.MIN_MAX_FRAME_SIZE, mockTransport);
     private final AmqpFramer _amqpFramer = new AmqpFramer();
 
     private final SaslInit _saslFrameBody;
@@ -60,7 +61,7 @@
         EncoderImpl encoder = new EncoderImpl(decoder);
         AMQPDefinedTypes.registerAllTypes(decoder,encoder);
 
-        _frameParser = new SaslFrameParser(_mockSaslFrameHandler, decoder, Transport.MIN_MAX_FRAME_SIZE);
+        _frameParser = new SaslFrameParser(_mockSaslFrameHandler, decoder, Transport.MIN_MAX_FRAME_SIZE, mockTransport);
         _saslFrameBody = new SaslInit();
         _saslFrameBody.setMechanism(Symbol.getSymbol("unused"));
         _saslFrameBytes = ByteBuffer.wrap(_amqpFramer.generateSaslFrame(0, new byte[0], _saslFrameBody));
@@ -106,7 +107,7 @@
     @Test
     public void testInputOfFrameWithInvalidSizeWhenSpecifyingLargeMaxFrameSize()
     {
-        SaslFrameParser frameParserWithLargeMaxSize = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, 2017);
+        SaslFrameParser frameParserWithLargeMaxSize = new SaslFrameParser(_mockSaslFrameHandler, _mockDecoder, 2017, mockTransport);
         sendAmqpSaslHeader(frameParserWithLargeMaxSize);
 
         // http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-security-v1.0-os.html#doc-idp43536
@@ -248,8 +249,9 @@
     private void doInputOfInvalidHeaderTestImpl(int invalidIndex) {
         SaslFrameHandler mockSaslFrameHandler = mock(SaslFrameHandler.class);
         ByteBufferDecoder mockDecoder = mock(ByteBufferDecoder.class);
+        TransportImpl mockTransport = mock(TransportImpl.class);
 
-        SaslFrameParser saslFrameParser = new SaslFrameParser(mockSaslFrameHandler, mockDecoder, Transport.MIN_MAX_FRAME_SIZE);
+        SaslFrameParser saslFrameParser = new SaslFrameParser(mockSaslFrameHandler, mockDecoder, Transport.MIN_MAX_FRAME_SIZE, mockTransport);
 
         byte[] header = Arrays.copyOf(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER.length);
         header[invalidIndex] = 'X';
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
index 3f0da6c..8b81355 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java
@@ -39,6 +39,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Binary;
@@ -3643,7 +3644,7 @@
     }
 
     @Test
-    public void testProtocolTracingLogsToTracer()
+    public void testProtocolTracingLogsFrameToTracer()
     {
         Connection connection = new ConnectionImpl();
         List<TransportFrame> frames = new ArrayList<>();
@@ -3673,7 +3674,8 @@
     }
 
     @Test
-    public void testProtocolTracingLogsToSystem() {
+    public void testProtocolTracingLogsFrameToSystem()
+    {
         Connection connection = new ConnectionImpl();
         TransportImpl spy = spy(_transport);
 
@@ -3691,4 +3693,185 @@
         assertTrue(frameCatcher.getValue().getBody() instanceof Open);
         assertNull(frameCatcher.getValue().getPayload());
     }
+
+    @Test
+    public void testProtocolTracingLogsHeaderToTracer()
+    {
+        doProtocolTracingLogsHeaderToTracerTestImpl(false);
+    }
+
+    @Test
+    public void testProtocolTracingLogsHeaderSaslToTracer()
+    {
+        doProtocolTracingLogsHeaderToTracerTestImpl(true);
+    }
+
+    private void doProtocolTracingLogsHeaderToTracerTestImpl(boolean sasl)
+    {
+        Connection connection = new ConnectionImpl();
+        AtomicReference<String> headerRef = new AtomicReference<>();
+        _transport.setProtocolTracer(new ProtocolTracer()
+        {
+            @Override
+            public void receivedHeader(String header)
+            {
+                assertTrue(headerRef.compareAndSet(null, header));
+            }
+
+            @Override
+            public void receivedFrame(TransportFrame transportFrame) { }
+            @Override
+            public void sentFrame(TransportFrame transportFrame) { }
+
+        });
+
+        if (sasl)
+        {
+            _transport.sasl();
+        }
+
+        assertTrue(_transport.isHandlingFrames());
+        _transport.bind(connection);
+
+        assertTrue(_transport.isHandlingFrames());
+        _transport.getInputBuffer().put(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER);
+        _transport.process();
+        assertTrue(_transport.isHandlingFrames());
+
+        assertNotNull(headerRef.get());
+        assertEquals(sasl ? "SASL" : "AMQP", headerRef.get());
+    }
+
+    @Test
+    public void testProtocolTracingLogsHeaderToSystem()
+    {
+        doProtocolTracingLogsHeaderToSystemTestImpl(false);
+    }
+
+    @Test
+    public void testProtocolTracingLogsHeaderSaslToSystem()
+    {
+        doProtocolTracingLogsHeaderToSystemTestImpl(true);
+    }
+
+    private void doProtocolTracingLogsHeaderToSystemTestImpl(boolean sasl)
+    {
+        Connection connection = new ConnectionImpl();
+
+        AtomicReference<String> headerRef = new AtomicReference<>();
+        AtomicReference<String> eventRef = new AtomicReference<>();
+        TransportImpl transport = new TransportImpl()
+        {
+            @Override
+            public void log(String event, String header) {
+                assertTrue(eventRef.compareAndSet(null, event));
+                assertTrue(headerRef.compareAndSet(null, header));
+            }
+        };
+        transport.trace(2);
+
+        if (sasl)
+        {
+            transport.sasl();
+        }
+
+        transport.bind(connection);
+
+        transport.getInputBuffer().put(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER);
+        transport.process();
+
+        assertEquals(TransportImpl.INCOMING, eventRef.get());
+        assertEquals(sasl ? "SASL" : "AMQP", headerRef.get());
+    }
+
+    @Test
+    public void testProtocolTracingLogsOutboundHeaderToTracer()
+    {
+        doProtocolTracingLogsOutboundHeaderToTracerTestImpl(false);
+    }
+
+    @Test
+    public void testProtocolTracingLogsOutboundHeaderSaslToTracer()
+    {
+        doProtocolTracingLogsOutboundHeaderToTracerTestImpl(true);
+    }
+
+    private void doProtocolTracingLogsOutboundHeaderToTracerTestImpl(boolean sasl)
+    {
+        Connection connection = new ConnectionImpl();
+        AtomicReference<String> headerRef = new AtomicReference<>();
+        _transport.setProtocolTracer(new ProtocolTracer()
+        {
+            @Override
+            public void sentHeader(String header)
+            {
+                assertTrue(headerRef.compareAndSet(null, header));
+            }
+
+            @Override
+            public void receivedFrame(TransportFrame transportFrame) { }
+            @Override
+            public void sentFrame(TransportFrame transportFrame) { }
+
+        });
+
+        if (sasl)
+        {
+            _transport.sasl();
+        }
+
+        _transport.bind(connection);
+
+        ByteBuffer expected = ByteBuffer.wrap(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER);
+
+        _transport.pending();
+        assertEquals(expected, _transport.getOutputBuffer());
+
+        assertEquals(sasl ? "SASL" : "AMQP", headerRef.get());
+    }
+
+    @Test
+    public void testProtocolTracingLogsOutboundHeaderToSystem()
+    {
+        doProtocolTracingLogsOutboundHeaderToSystemTestImpl(false);
+    }
+
+    @Test
+    public void testProtocolTracingLogsOutboundHeaderSaslToSystem()
+    {
+        doProtocolTracingLogsOutboundHeaderToSystemTestImpl(true);
+    }
+
+    private void doProtocolTracingLogsOutboundHeaderToSystemTestImpl(boolean sasl)
+    {
+        Connection connection = new ConnectionImpl();
+
+        AtomicReference<String> headerRef = new AtomicReference<>();
+        AtomicReference<String> eventRef = new AtomicReference<>();
+        TransportImpl transport = new TransportImpl()
+        {
+            @Override
+            public void log(String event, String header)
+            {
+                assertTrue(eventRef.compareAndSet(null, event));
+                assertTrue(headerRef.compareAndSet(null, header));
+            }
+        };
+        transport.trace(2);
+
+        if (sasl)
+        {
+            transport.sasl();
+        }
+
+        transport.bind(connection);
+
+        ByteBuffer expected = ByteBuffer.wrap(sasl ? AmqpHeader.SASL_HEADER : AmqpHeader.HEADER);
+
+        transport.pending();
+        assertEquals(expected, transport.getOutputBuffer());
+
+        assertEquals(TransportImpl.OUTGOING, eventRef.get());
+        assertEquals(sasl ? "SASL" : "AMQP", headerRef.get());
+    }
 }