PROTON-1774 Remove sasl processing when complete

Once complete remove any impact of the SaslTransportWrapper from the
transport chain by selecting back to the next TransportWrappers for
input and output once SASL is complete and the pending buffers are
drained.  Move the buffers for sasl input and output into the wrapper
which can be GC'd once removed from the chain.
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
index 9125625..cee798f 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SaslImpl.java
@@ -58,9 +58,8 @@
     private final TransportImpl _transport;
 
     private boolean _tail_closed = false;
-    private final ByteBuffer _inputBuffer;
     private boolean _head_closed = false;
-    private final ByteBuffer _outputBuffer;
+    private final int _maxFrameSize;
     private final FrameWriter _frameWriter;
 
     private ByteBuffer _pending;
@@ -96,8 +95,7 @@
     SaslImpl(TransportImpl transport, int maxFrameSize)
     {
         _transport = transport;
-        _inputBuffer = newWriteableBuffer(maxFrameSize);
-        _outputBuffer = newWriteableBuffer(maxFrameSize);
+        _maxFrameSize = maxFrameSize;
 
         AMQPDefinedTypes.registerAllTypes(_decoder,_encoder);
         _frameParser = new SaslFrameParser(this, _decoder, maxFrameSize);
@@ -122,17 +120,6 @@
         return _done && (_role==Role.CLIENT || _initReceived);
     }
 
-    private void writeSaslOutput()
-    {
-        process();
-        _frameWriter.readBytes(_outputBuffer);
-
-        if(_logger.isLoggable(Level.FINER))
-        {
-            _logger.log(Level.FINER, "Finished writing SASL output. Output Buffer : " + _outputBuffer);
-        }
-    }
-
     private void process()
     {
         processHeader();
@@ -226,7 +213,6 @@
         if(!_headerWritten)
         {
             _frameWriter.writeHeader(AmqpHeader.SASL_HEADER);
-
             _headerWritten = true;
             return AmqpHeader.SASL_HEADER.length;
         }
@@ -514,7 +500,7 @@
 
     public TransportWrapper wrap(final TransportInput input, final TransportOutput output)
     {
-        return new SaslSniffer(new SaslTransportWrapper(input, output),
+        return new SaslSniffer(new SwitchingSaslTransportWrapper(input, output),
                                new PlainTransportWrapper(output, input)) {
             protected boolean isDeterminationMade() {
                 if (_role == Role.SERVER && _allowSkip) {
@@ -545,13 +531,23 @@
         private final TransportInput _underlyingInput;
         private final TransportOutput _underlyingOutput;
         private boolean _outputComplete;
+
+        private final ByteBuffer _outputBuffer;
+        private final ByteBuffer _inputBuffer;
         private final ByteBuffer _head;
 
-        private SaslTransportWrapper(TransportInput input, TransportOutput output)
+        private final SwitchingSaslTransportWrapper _parent;
+
+        private SaslTransportWrapper(SwitchingSaslTransportWrapper parent, TransportInput input, TransportOutput output)
         {
             _underlyingInput = input;
             _underlyingOutput = output;
 
+            _inputBuffer = newWriteableBuffer(_maxFrameSize);
+            _outputBuffer = newWriteableBuffer(_maxFrameSize);
+
+            _parent = parent;
+
             if (_transport.isUseReadOnlyOutputBuffer()) {
                 _head = _outputBuffer.asReadOnlyBuffer();
             } else {
@@ -565,7 +561,7 @@
         {
             if(isOutputInSaslMode())
             {
-                SaslImpl.this.writeSaslOutput();
+                writeSaslOutput();
                 if(_done)
                 {
                     _outputComplete = true;
@@ -579,7 +575,7 @@
          */
         private boolean isInputInSaslMode()
         {
-            return _role == null || (_role == Role.CLIENT && !_done) ||(_role == Role.SERVER && (!_initReceived || !_done));
+            return _role == null || (_role == Role.CLIENT && !_done) || (_role == Role.SERVER && (!_initReceived || !_done));
         }
 
         private boolean isOutputInSaslMode()
@@ -680,12 +676,17 @@
                         _tail_closed = true;
                     }
 
-                    _underlyingInput.process();
+                    if (!_inputBuffer.hasRemaining())
+                    {
+                        _parent.switchToNextInput();
+                    }
                 }
                 else
                 {
-                    _underlyingInput.process();
+                    _parent.switchToNextInput();
                 }
+
+                _underlyingInput.process();
             }
         }
 
@@ -708,6 +709,7 @@
             }
             else
             {
+                _parent.switchToNextOutput();
                 return _underlyingOutput.pending();
             }
         }
@@ -722,6 +724,7 @@
             }
             else
             {
+                _parent.switchToNextOutput();
                 return _underlyingOutput.head();
             }
         }
@@ -739,6 +742,7 @@
             }
             else
             {
+                _parent.switchToNextOutput();
                 _underlyingOutput.pop(bytes);
             }
         }
@@ -746,8 +750,93 @@
         @Override
         public void close_head()
         {
+            _parent.switchToNextOutput();
             _underlyingOutput.close_head();
         }
+
+        private void writeSaslOutput()
+        {
+            SaslImpl.this.process();
+            _frameWriter.readBytes(_outputBuffer);
+
+            if(_logger.isLoggable(Level.FINER))
+            {
+                _logger.log(Level.FINER, "Finished writing SASL output. Output Buffer : " + _outputBuffer);
+            }
+        }
+    }
+
+    private class SwitchingSaslTransportWrapper implements TransportWrapper {
+
+        private final TransportInput _underlyingInput;
+        private final TransportOutput _underlyingOutput;
+
+        private TransportInput currentInput;
+        private TransportOutput currentOutput;
+
+        private SwitchingSaslTransportWrapper(TransportInput input, TransportOutput output) {
+            _underlyingInput = input;
+            _underlyingOutput = output;
+
+            // The wrapper can be GC'd after both current's are switched to next.
+            SaslTransportWrapper saslProcessor = new SaslTransportWrapper(this, input, output);
+
+            currentInput = saslProcessor;
+            currentOutput = saslProcessor;
+        }
+
+        @Override
+        public int capacity() {
+            return currentInput.capacity();
+        }
+
+        @Override
+        public int position() {
+            return currentInput.position();
+        }
+
+        @Override
+        public ByteBuffer tail() throws TransportException {
+            return currentInput.tail();
+        }
+
+        @Override
+        public void process() throws TransportException {
+            currentInput.process();
+        }
+
+        @Override
+        public void close_tail() {
+            currentInput.close_tail();
+        }
+
+        @Override
+        public int pending() {
+            return currentOutput.pending();
+        }
+
+        @Override
+        public ByteBuffer head() {
+            return currentOutput.head();
+        }
+
+        @Override
+        public void pop(int bytes) {
+            currentOutput.pop(bytes);
+        }
+
+        @Override
+        public void close_head() {
+            currentOutput.close_head();
+        }
+
+        void switchToNextInput() {
+            currentInput = _underlyingInput;
+        }
+
+        void switchToNextOutput() {
+            currentOutput = _underlyingOutput;
+        }
     }
 
     @Override
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
index 9dadf29..6868496 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
@@ -36,6 +36,11 @@
     private final ProtonContainer _client = new ProtonContainer("clientContainer");
     private final ProtonContainer _server = new ProtonContainer("serverContainer");
 
+    protected boolean shouldLogPumpedBytes()
+    {
+        return true;
+    }
+
     protected TestLoggingHelper getTestLoggingHelper()
     {
         return _testLoggingHelper;
@@ -61,7 +66,10 @@
     {
         ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
 
-        getTestLoggingHelper().prettyPrint("          <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer);
+        if (shouldLogPumpedBytes())
+        {
+            getTestLoggingHelper().prettyPrint("          <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer);
+        }
         assertTrue("Server expected to produce some output", serverBuffer.hasRemaining());
 
         ByteBuffer clientBuffer = getClient().transport.getInputBuffer();
@@ -78,7 +86,10 @@
     {
         ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
 
-        getTestLoggingHelper().prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ", clientBuffer);
+        if (shouldLogPumpedBytes())
+        {
+            getTestLoggingHelper().prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ", clientBuffer);
+        }
         assertTrue("Client expected to produce some output", clientBuffer.hasRemaining());
 
         ByteBuffer serverBuffer = getServer().transport.getInputBuffer();