PROTON-2490 Fix some issues with engine in SASL server mode

Fixes issue where an engine used as a SASL server does not properly
update the frame decoder state and interest mask.  Fixes issues in the
test peer when testing the engine as a SASL server and some API naming
in peer SASL expectation types.
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
index f667a70..5d57bc8 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/AMQPTestDriver.java
@@ -288,6 +288,12 @@
             }
 
             try {
+                // When the outcome of SASL is read the decoder should revert to initial state
+                // as the only valid next incoming value is an AMQP header.
+                if (sasl instanceof SaslOutcome) {
+                    frameParser.resetToExpectingHeader();
+                }
+
                 sasl.invoke(scriptEntry, frameSize, this);
             } catch (UnexpectedPerformativeError e) {
                 if (scriptEntry.isOptional()) {
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/SaslInitInjectAction.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/SaslInitInjectAction.java
index 24ef11e..65f0bd0 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/SaslInitInjectAction.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/actions/SaslInitInjectAction.java
@@ -43,12 +43,12 @@
         return this;
     }
 
-    public SaslInitInjectAction withMechanism(byte[] response) {
+    public SaslInitInjectAction withInitialResponse(byte[] response) {
         saslInit.setInitialResponse(new Binary(response));
         return this;
     }
 
-    public SaslInitInjectAction withMechanism(Binary response) {
+    public SaslInitInjectAction withInitialResponse(Binary response) {
         saslInit.setInitialResponse(response);
         return this;
     }
diff --git a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
index bf85b25..36c81e8 100644
--- a/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
+++ b/protonj2-test-driver/src/main/java/org/apache/qpid/protonj2/test/driver/expectations/SaslOutcomeExpectation.java
@@ -21,6 +21,7 @@
 import org.apache.qpid.protonj2.test.driver.AMQPTestDriver;
 import org.apache.qpid.protonj2.test.driver.codec.ListDescribedType;
 import org.apache.qpid.protonj2.test.driver.codec.primitives.Binary;
+import org.apache.qpid.protonj2.test.driver.codec.primitives.UnsignedByte;
 import org.apache.qpid.protonj2.test.driver.codec.security.SaslCode;
 import org.apache.qpid.protonj2.test.driver.codec.security.SaslOutcome;
 import org.apache.qpid.protonj2.test.driver.matchers.security.SaslOutcomeMatcher;
@@ -39,8 +40,12 @@
 
     //----- Type specific with methods that perform simple equals checks
 
+    public SaslOutcomeExpectation withCode(byte code) {
+        return withCode(equalTo(UnsignedByte.valueOf(code)));
+    }
+
     public SaslOutcomeExpectation withCode(SaslCode code) {
-        return withCode(equalTo(code));
+        return withCode(equalTo(code.getValue()));
     }
 
     public SaslOutcomeExpectation withAdditionalData(byte[] additionalData) {
diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
index 0c01e4d..11cf173 100644
--- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
+++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandler.java
@@ -112,14 +112,12 @@
     @Override
     public void handleRead(EngineHandlerContext context, SASLEnvelope envelope) {
         envelope.getBody().invoke(this, context);
-        ((ProtonEngineHandlerContext) context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         context.fireRead(envelope);
     }
 
     @Override
     public void handleWrite(EngineHandlerContext context, SASLEnvelope envelope) {
         envelope.invoke(this, context);
-        ((ProtonEngineHandlerContext) context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         context.fireWrite(envelope);
     }
 
@@ -210,7 +208,6 @@
                 // Transition to parsing the frames if any pipelined into this buffer.
                 transitionToFrameSizeParsingStage();
 
-                // This probably isn't right as this fires to next not current.
                 if (header.isSaslHeader()) {
                     decoder = CodecFactory.getSaslDecoder();
                     decoderState = decoder.newDecoderState();
@@ -218,6 +215,10 @@
                 } else {
                     decoder = CodecFactory.getDecoder();
                     decoderState = decoder.newDecoderState();
+                    // Once we've read an AMQP header we no longer care if any SASL work
+                    // occurs as that would be erroneous behavior which this handler doesn't
+                    // deal with.
+                    ((ProtonEngineHandlerContext) context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
                     context.fireRead(HeaderEnvelope.AMQP_HEADER_ENVELOPE);
                 }
             }
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java
index 6c6d720..ae37f96 100644
--- a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameDecodingHandlerTest.java
@@ -70,7 +70,7 @@
     @Test
     public void testReadValidHeaderInSingleByteChunks() throws Exception {
         ProtonFrameDecodingHandler handler = createFrameDecoder();
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
 
         handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 'A' }));
         handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 'M' }));
@@ -82,18 +82,32 @@
         handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 0 }));
 
         Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+        Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         Mockito.verifyNoMoreInteractions(context);
     }
 
     @Test
     public void testReadValidHeaderInSplitChunks() throws Exception {
         ProtonFrameDecodingHandler handler = createFrameDecoder();
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
 
         handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 'A', 'M', 'Q', 'P' }));
         handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 0, 1, 0, 0 }));
 
         Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+        Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
+        Mockito.verifyNoMoreInteractions(context);
+    }
+
+    @Test
+    public void testReadOfSaslHeaderDoesNotDisableWritesMonitoring() throws Exception {
+        ProtonFrameDecodingHandler handler = createFrameDecoder();
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
+
+        handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 'A', 'M', 'Q', 'P' }));
+        handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(new byte[] { 3, 1, 0, 0 }));
+
+        Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
         Mockito.verifyNoMoreInteractions(context);
     }
 
@@ -145,12 +159,13 @@
         ArgumentCaptor<IncomingAMQPEnvelope> argument = ArgumentCaptor.forClass(IncomingAMQPEnvelope.class);
 
         ProtonFrameDecodingHandler handler = createFrameDecoder();
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
 
         handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
         handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(emptyOpen));
 
         Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+        Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         Mockito.verify(context).fireRead(argument.capture());
         Mockito.verifyNoMoreInteractions(context);
 
@@ -183,9 +198,10 @@
         ArgumentCaptor<IncomingAMQPEnvelope> argument = ArgumentCaptor.forClass(IncomingAMQPEnvelope.class);
 
         ProtonFrameDecodingHandler handler = createFrameDecoder();
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
 
         handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
+        Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(basicOpen));
 
         Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
@@ -226,11 +242,12 @@
         ArgumentCaptor<IncomingAMQPEnvelope> argument = ArgumentCaptor.forClass(IncomingAMQPEnvelope.class);
 
         ProtonFrameDecodingHandler handler = createFrameDecoder();
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
 
         handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(basicOpen));
 
         Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+        Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         Mockito.verify(context).fireRead(argument.capture());
         Mockito.verifyNoMoreInteractions(context);
 
@@ -265,11 +282,12 @@
         byte[] emptyFrame = new byte[] { (byte) 0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00 };
 
         ProtonFrameDecodingHandler handler = createFrameDecoder();
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
 
         handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
 
         Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+        Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         Mockito.verifyNoMoreInteractions(context);
 
         handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(emptyFrame));
@@ -293,11 +311,12 @@
                                           (byte) 0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00 };
 
         ProtonFrameDecodingHandler handler = createFrameDecoder();
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
 
         handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
 
         Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+        Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         Mockito.verifyNoMoreInteractions(context);
 
         handler.handleRead(context, ProtonByteBufferAllocator.DEFAULT.wrap(emptyFrames));
@@ -324,11 +343,12 @@
         byte[] undersizedFrameHeader = new byte[] { (byte) 0x00, 0x00, 0x00, 0x07, 0x02, 0x00, 0x00, 0x00 };
 
         ProtonFrameDecodingHandler handler = createFrameDecoder();
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
 
         handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
 
         Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+        Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         Mockito.verifyNoMoreInteractions(context);
 
         try {
@@ -352,11 +372,12 @@
         byte[] underMinDoffFrameHeader = new byte[] { (byte) 0x00, 0x00, 0x00, 0x08, 0x01, 0x00, 0x00, 0x00 };
 
         ProtonFrameDecodingHandler handler = createFrameDecoder();
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
 
         handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
 
         Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+        Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         Mockito.verifyNoMoreInteractions(context);
 
         try {
@@ -380,11 +401,12 @@
         byte[] overFrameSizeDoffFrameHeader = new byte[] { (byte) 0x00, 0x00, 0x00, 0x08, 0x03, 0x00, 0x00, 0x00 };
 
         ProtonFrameDecodingHandler handler = createFrameDecoder();
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
 
         handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
 
         Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+        Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         Mockito.verifyNoMoreInteractions(context);
 
         try {
@@ -406,11 +428,12 @@
         byte[] overFrameSizeLimitFrameHeader = new byte[] { (byte) 0xA0, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00 };
 
         ProtonFrameDecodingHandler handler = createFrameDecoder();
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
 
         handler.handleRead(context, AMQPHeader.getAMQPHeader().getBuffer());
 
         Mockito.verify(context).fireRead(Mockito.any(HeaderEnvelope.class));
+        Mockito.verify(context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
         Mockito.verifyNoMoreInteractions(context);
 
         try {
@@ -431,7 +454,7 @@
         ProtonEngine engine = Mockito.mock(ProtonEngine.class);
         Mockito.when(engine.configuration()).thenReturn(configuration);
         Mockito.when(engine.isWritable()).thenReturn(Boolean.TRUE);
-        EngineHandlerContext context = Mockito.mock(EngineHandlerContext.class);
+        ProtonEngineHandlerContext context = Mockito.mock(ProtonEngineHandlerContext.class);
         Mockito.when(context.engine()).thenReturn(engine);
 
         ProtonFrameDecodingHandler handler = new ProtonFrameDecodingHandler();
diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/sasl/ProtonSaslServerTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/sasl/ProtonSaslServerTest.java
new file mode 100644
index 0000000..1af0bde
--- /dev/null
+++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/sasl/ProtonSaslServerTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.qpid.protonj2.engine.impl.sasl;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.charset.StandardCharsets;
+
+import org.apache.qpid.protonj2.buffer.ProtonBuffer;
+import org.apache.qpid.protonj2.engine.Engine;
+import org.apache.qpid.protonj2.engine.EngineFactory;
+import org.apache.qpid.protonj2.engine.exceptions.ProtocolViolationException;
+import org.apache.qpid.protonj2.engine.impl.ProtonEngineTestSupport;
+import org.apache.qpid.protonj2.engine.sasl.SaslOutcome;
+import org.apache.qpid.protonj2.engine.sasl.SaslServerContext;
+import org.apache.qpid.protonj2.engine.sasl.SaslServerListener;
+import org.apache.qpid.protonj2.test.driver.ProtonTestConnector;
+import org.apache.qpid.protonj2.test.driver.codec.security.SaslCode;
+import org.apache.qpid.protonj2.types.Symbol;
+import org.apache.qpid.protonj2.types.transport.AMQPHeader;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+/**
+ * Test proton engine from the perspective of a SASL client
+ */
+@Timeout(20)
+public class ProtonSaslServerTest extends ProtonEngineTestSupport {
+
+    @Test
+    public void testEngineFailsIfAMQPHeaderArrivesWhenSASLHeaderExpected() throws Exception {
+        Engine engine = EngineFactory.PROTON.createEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        ProtonTestConnector peer = createTestPeer(engine);
+
+        // Setup basic SASL server which only allows ANONYMOUS
+        engine.saslDriver().server().setListener(createAnonymousSaslServerListener());
+        engine.connection().openHandler((conn) -> conn.open());
+        engine.connection().closeHandler((conn) -> conn.close());
+        engine.start();
+
+        peer.expectSASLHeader();
+
+        try {
+            peer.remoteHeader(AMQPHeader.getAMQPHeader().toArray()).now();
+        } catch (AssertionError pve) {
+            assertTrue(pve.getCause() instanceof ProtocolViolationException);
+        }
+
+        peer.waitForScriptToCompleteIgnoreErrors();
+
+        assertNotNull(failure);
+    }
+
+    @Test
+    public void testSaslAnonymousConnection() throws Exception {
+        Engine engine = EngineFactory.PROTON.createEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        ProtonTestConnector peer = createTestPeer(engine);
+
+        // Setup basic SASL server which only allows ANONYMOUS
+        engine.saslDriver().server().setListener(createAnonymousSaslServerListener());
+        engine.connection().openHandler((conn) -> conn.open());
+        engine.connection().closeHandler((conn) -> conn.close());
+        engine.start();
+
+        peer.expectSASLHeader();
+        peer.expectSaslMechanisms().withSaslServerMechanisms("ANONYMOUS");
+        peer.remoteHeader(AMQPHeader.getSASLHeader().toArray()).now();
+        peer.waitForScriptToComplete();
+
+        peer.expectSaslOutcome().withCode(SaslCode.OK);
+        peer.remoteSaslInit().withMechanism("ANONYMOUS").now();
+        peer.waitForScriptToComplete();
+
+        peer.expectAMQPHeader();
+        peer.expectOpen();
+        peer.remoteHeader(AMQPHeader.getAMQPHeader().toArray()).now();
+        peer.remoteOpen().now();
+        peer.waitForScriptToComplete();
+
+        peer.expectClose();
+        peer.remoteClose().now();
+        peer.waitForScriptToComplete();
+
+        assertNull(failure);
+    }
+
+    @Test
+    public void testSaslPlainConnection() throws Exception {
+        Engine engine = EngineFactory.PROTON.createEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        ProtonTestConnector peer = createTestPeer(engine);
+
+        // Setup basic SASL server which only allows ANONYMOUS
+        engine.saslDriver().server().setListener(createPlainSaslServerListener());
+        engine.connection().openHandler((conn) -> conn.open());
+        engine.connection().closeHandler((conn) -> conn.close());
+        engine.start();
+
+        peer.expectSASLHeader();
+        peer.expectSaslMechanisms().withSaslServerMechanisms("PLAIN");
+        peer.remoteHeader(AMQPHeader.getSASLHeader().toArray()).now();
+        peer.waitForScriptToComplete();
+
+        peer.expectSaslOutcome().withCode(SaslCode.OK);
+        peer.remoteSaslInit().withMechanism("PLAIN")
+                             .withInitialResponse(saslPlainInitialResponse("user", "pass")).now();
+        peer.waitForScriptToComplete();
+
+        peer.expectAMQPHeader();
+        peer.expectOpen();
+        peer.remoteHeader(AMQPHeader.getAMQPHeader().toArray()).now();
+        peer.remoteOpen().now();
+        peer.waitForScriptToComplete();
+
+        peer.expectClose();
+        peer.remoteClose().now();
+        peer.waitForScriptToComplete();
+
+        assertNull(failure);
+    }
+
+    @Test
+    public void testSaslPlainConnectionFailedWhenAnonymousOffered() throws Exception {
+        Engine engine = EngineFactory.PROTON.createEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        ProtonTestConnector peer = createTestPeer(engine);
+
+        // Setup basic SASL server which only allows ANONYMOUS
+        engine.saslDriver().server().setListener(createPlainSaslServerListener());
+        engine.connection().openHandler((conn) -> conn.open());
+        engine.connection().closeHandler((conn) -> conn.close());
+        engine.start();
+
+        peer.expectSASLHeader();
+        peer.expectSaslMechanisms().withSaslServerMechanisms("PLAIN");
+        peer.remoteHeader(AMQPHeader.getSASLHeader().toArray()).now();
+        peer.waitForScriptToComplete();
+
+        peer.expectSaslOutcome().withCode(SaslCode.SYS_PERM);
+        peer.remoteSaslInit().withMechanism("ANONYMOUS").now();
+        peer.waitForScriptToComplete();
+
+        assertNull(failure);
+    }
+
+    @Test
+    public void testEngineFailsForUnexpecetedNonSaslFrameDuringSaslExchange() throws Exception {
+        Engine engine = EngineFactory.PROTON.createEngine();
+        engine.errorHandler(result -> failure = result.failureCause());
+        ProtonTestConnector peer = createTestPeer(engine);
+
+        // Setup basic SASL server which only allows ANONYMOUS
+        engine.saslDriver().server().setListener(createPlainSaslServerListener());
+        engine.connection().openHandler((conn) -> conn.open());
+        engine.connection().closeHandler((conn) -> conn.close());
+        engine.start();
+
+        peer.expectSASLHeader();
+        peer.expectSaslMechanisms().withSaslServerMechanisms("PLAIN");
+        peer.remoteHeader(AMQPHeader.getSASLHeader().toArray()).now();
+        peer.waitForScriptToComplete();
+
+        try {
+            peer.remoteOpen().now();
+        } catch (AssertionError pve) {
+            assertTrue(pve.getCause() instanceof ProtocolViolationException);
+        }
+
+        peer.waitForScriptToCompleteIgnoreErrors();
+
+        assertNotNull(failure);
+    }
+
+    public byte[] saslPlainInitialResponse(String username, String password) {
+        byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
+        byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
+        byte[] initialResponse = new byte[usernameBytes.length + passwordBytes.length + 2];
+        System.arraycopy(usernameBytes, 0, initialResponse, 1, usernameBytes.length);
+        System.arraycopy(passwordBytes, 0, initialResponse, 2 + usernameBytes.length, passwordBytes.length);
+
+        return initialResponse;
+    }
+
+    private SaslServerListener createAnonymousSaslServerListener() {
+        return new SaslServerListener() {
+
+            @Override
+            public void handleSaslResponse(SaslServerContext context, ProtonBuffer response) {
+                throw new RuntimeException("Not expecting any SASL Response frames");
+            }
+
+            @Override
+            public void handleSaslInit(SaslServerContext context, Symbol mechanism, ProtonBuffer initResponse) {
+                if (mechanism.equals(Symbol.valueOf("ANONYMOUS"))) {
+                    context.sendOutcome(SaslOutcome.SASL_OK, null);
+                } else {
+                    context.sendOutcome(SaslOutcome.SASL_PERM, null);
+                }
+            }
+
+            @Override
+            public void handleSaslHeader(SaslServerContext context, AMQPHeader header) {
+                context.sendMechanisms(new Symbol[] { Symbol.valueOf("ANONYMOUS") });
+            }
+        };
+    }
+
+    private SaslServerListener createPlainSaslServerListener() {
+        return new SaslServerListener() {
+
+            @Override
+            public void handleSaslResponse(SaslServerContext context, ProtonBuffer response) {
+                throw new RuntimeException("Not expecting any SASL Response frames");
+            }
+
+            @Override
+            public void handleSaslInit(SaslServerContext context, Symbol mechanism, ProtonBuffer initResponse) {
+                if (mechanism.equals(Symbol.valueOf("PLAIN"))) {
+                    context.sendOutcome(SaslOutcome.SASL_OK, null);
+                } else {
+                    context.sendOutcome(SaslOutcome.SASL_PERM, null);
+                }
+            }
+
+            @Override
+            public void handleSaslHeader(SaslServerContext context, AMQPHeader header) {
+                context.sendMechanisms(new Symbol[] { Symbol.valueOf("PLAIN") });
+            }
+        };
+    }
+}